Socket:讀寫處理及鏈接斷開的檢測

做爲進程間通訊及網絡通訊的一種重要技術,在實際的開發中,socket編程是常常被用到的。關於socket編程的通常步驟,這裏再也不贅述,相關資料和文章不少,google/baidu便可。編程

本文主要是探討如何更好地進行socket讀寫處理,以及如何檢測鏈接斷開。網絡

首先,有如下幾點須要注意:app

  1. 對於全雙工的socket,同時讀寫是沒問題的。好比,一個socket程序有兩個線程,一個線程對socket進行讀操做(recv/read),一個線程對socket進行寫操做(send/write),這裏是不須要進行互斥操做或作臨界區保護的。
  2. 在Unix系統下, 對一個對端已經關閉的socket調用兩次write,第二次將會生成SIGPIPE信號,該信號的默認處理動做是終止進程。爲了防止在這種狀況下致使進程退出,咱們須要屏蔽該信號的默認處理動做。有兩種方法,a) 在程序開頭調用signal(SIGPIPE, SIG_IGN),忽略SIGPIPE信號的默認動做;b) 採用send函數的MSG_NOSIGNAL標誌位,忽略SIGPIPE信號。固然,雖然咱們忽略了SIGPIPE,但errno仍是會被設置爲EPIPE的。所以,咱們能夠根據這點,按照咱們的狀況來進行相應的處理了。
  3. 對文件描述符進行select/poll操做,a) 若是select/poll返回值大於0,此時進行recv/read,recv/read返回0,代表鏈接關閉; b)  recv/read返回-1,而且errno爲ECONNRESET、EBADF、EPIPE、ENOTSOCK之一時,也代表鏈接關閉。
  4. 對文件描述符進行send/write操做,若是send/write返回-1,而且errno爲ECONNRESET、EBADF、EPIPE、ENOTSOCK之一時,也代表鏈接關閉。


下面是一個demo程序,server端代碼(server.c)爲:
socket

#define _GNU_SOURCE
#include <stdio.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <poll.h>
#include <pthread.h>
#include <errno.h>

#define UNIX_PATH_MAX 108
#define SOCK_PATH "/tmp/test.sock"

int sock_write(int fd, char* buf, int len)
{
    int err_num, res;
    char err_str[128];
    int disconnect = 0;

    while(!disconnect){
        /* Use send with a MSG_NOSIGNAL to ignore a SIGPIPE signal which 
         * would cause the process exit. */
        res = send(fd, buf, len, MSG_NOSIGNAL | MSG_DONTWAIT);
        if( -1 == res ){
            err_num = errno;            
            printf("send error:%s!\n", strerror_r(err_num, err_str, sizeof(err_str)));

            switch(err_num){
                case ECONNRESET:
                case EBADF:
                case EPIPE: 
                case ENOTSOCK:
                    disconnect = 1;
                    break;
                //case EWOULDBLOCK:
                case EAGAIN:                    
                    usleep(10000);
                    break;
                case EINTR:
                    break;
                default:
                    break;
            }            
        }else if( res > 0 ){
            if( res < len ){
                /* Incomplete information. */
                buf += res;
                len -= res;
            }else if( res == len ){
                /* Message has sended successfully. */
                break;
            }    
        }
    }
    return disconnect;
}

void* write_handle(void* fd)
{
    int len, connection_fd;
    char buffer[256];
    char* end_flag = "\r\n";

    connection_fd = *(int*)fd;
    len = snprintf(buffer, 256, "buffer data ends with a end flag%s", end_flag);
    buffer[len] = 0;

    while(0 == sock_write(connection_fd, buffer, len)){
        sleep(1);
    }
}

int main(void)
{
    struct sockaddr_un address;
    int socket_fd, connection_fd;
    socklen_t address_length;

    socket_fd = socket(PF_UNIX, SOCK_STREAM, 0);
    if(socket_fd < 0)     {
         printf("%s:socket() failed\n", SOCK_PATH);
         return -1;
     }
     unlink(SOCK_PATH);
     memset(&address, 0, sizeof(struct sockaddr_un));
     address.sun_family = AF_UNIX;
     snprintf(address.sun_path, UNIX_PATH_MAX, SOCK_PATH);
     if(bind(socket_fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)) != 0)
     {
         printf("%s:bind() failed\n", SOCK_PATH);
         return -1;
     }
     if(listen(socket_fd, 0) != 0)
     {
         printf("%s:listen() failed\n", SOCK_PATH);
         return -1;
     }
     while((connection_fd = accept(socket_fd, (struct sockaddr *) &address,&address_length)) > -1)
     {
        pthread_t w_thread;
        struct pollfd pfd;
        char buffer[512];
        int nbytes;
        int cnt;
        int res;
        int err_num;
        int disconnect=0;
#ifdef DEBUG
        char str[128];
#endif

        printf("\n\n%s:accept a connection!\n", SOCK_PATH);

        pthread_create(&w_thread, NULL, write_handle, &connection_fd);

        pfd.fd = connection_fd;
        pfd.events = POLLIN | POLLHUP | POLLRDNORM;
        pfd.revents = 0;

        while(1){
            res = poll(&pfd, 1, 500);
            if(res >= 0){
                // if result > 0, this means that there is either data available on the
                // socket, or the socket has been closed
                cnt = recv(connection_fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT);
                if( 0 == cnt ){
                    if(res > 0){
                    // if recv returns zero, that means the connection has been closed.
    #ifdef DEBUG
                        printf("connection disconnect 1!\n");
    #endif
                        disconnect = 1; 
                        break;
                    }
                }else if( -1 == cnt ){
                    err_num = errno;
    #ifdef DEBUG
                    printf("recv error:%s!\n", strerror_r(errno, str, sizeof(str)));
    #endif
                    switch(err_num){
                        case ECONNRESET:
                        case EBADF:
                        case EPIPE: 
                        case ENOTSOCK:
                            disconnect = 1; 
                            break;
                        default:
                            break;
                    }
                    if( disconnect ){
    #ifdef DEBUG
                        printf("connection disconnect 2!\n");
    #endif
                        break;
                    }
                }else if( cnt > 0 ){
                    /* discard everything received from client.*/
                    while((nbytes = recv(connection_fd, buffer, sizeof(buffer)-1, MSG_DONTWAIT)) > 0){
                        buffer[nbytes] = 0;
    #ifdef DEBUG
                        printf("buffer:%s\n", buffer);
    #endif
                    }
    #ifdef DEBUG
                    if( 0 == nbytes ){
                        printf("All received!\n");
                    }
    #endif
                }
            }
    #ifdef DEBUG
            else if(res == -1){
                /* This case shouldn't happen, we sleep 5 seconds here in case and retry it. */
                printf("Error: poll return -1!!!\n");
                sleep(5);
            }
    #endif
        }
        close(connection_fd);
        pthread_cancel(w_thread);
    }

    close(socket_fd);
    unlink(SOCK_PATH);
    return 0;
}

client端代碼(client.c)爲:函數

#define _GNU_SOURCE
#include <stdio.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <poll.h>
#include <errno.h> 

#define UNIX_PATH_MAX 108
#define SOCK_PATH "/tmp/test.sock"

int main(void)
{
    struct sockaddr_un address;
    int  socket_fd, res;
    char buffer[256];
    pthread_t w_thread;
    struct pollfd pfd;
    int err_num;
    int disconnect; 

    while(1){
        socket_fd = socket(PF_UNIX, SOCK_STREAM, 0);
        if(socket_fd < 0)
        {
             printf("%s:socket() failed\n", SOCK_PATH);
             sleep(5);
             continue;
        }
        memset(&address, 0, sizeof(struct sockaddr_un));
        address.sun_family = AF_UNIX;
        snprintf(address.sun_path, UNIX_PATH_MAX, SOCK_PATH);
        if(connect(socket_fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)) != 0)
        {
            printf("%s:connect() failed\n", SOCK_PATH);
            close(socket_fd);
            socket_fd = -1;
            sleep(5);
            continue;
        } 
#ifdef DEBUG
        printf("connect success!\n"); 
#endif
#if 1
        pfd.fd = socket_fd;
        pfd.events = POLLIN | POLLHUP | POLLRDNORM;
        pfd.revents = 0;
        disconnect = 0;
        while(1){
             // use the poll system call to be notified about socket status changes
             res = poll(&pfd, 1, 60000);
             if(res >= 0){
                // if result > 0, this means that there is either data available on the
                // socket, or the socket has been closed
                char buffer[512];
                int cnt, nbytes;
                cnt = recv(socket_fd, buffer, sizeof(buffer)-1, MSG_PEEK | MSG_DONTWAIT);
                if( -1 == cnt){
                    err_num = errno;
                    switch(err_num){
                    case ECONNRESET:
                    case EBADF:
                    case EPIPE: 
                    case ENOTSOCK:
                        disconnect = 1;                            
                        break;
                    default:
                        break;
                    }
                    if(disconnect){
                        break;
                    }
                }
                else if( 0 == cnt){
                    if(res > 0){
             // if recv returns zero, that means the connection has been closed:
             disconnect = 1;                            
                        break;
                    }
                }
                else if( cnt > 0 ){
                    while((nbytes = recv(socket_fd, buffer, sizeof(buffer)-1, MSG_DONTWAIT)) > 0){
                        buffer[nbytes] = 0;
                        printf("buffer:%s\n", buffer);
                    }
                }
            }
        }
#endif
        close(socket_fd);
        socket_fd = -1;
#ifdef DEBUG
        printf("server disconnect!\n");
#endif
        /* here sleep 5 seconds and re-connect. */
        sleep(5);
    }

    return 0;
}
相關文章
相關標籤/搜索