IPC研究(3) -- pipe續

續上篇IPC-Pipes

上篇的example3中,我用named pipe(FIFO)來實現client和server的通訊。
因爲client每次發完一個包後,都會sleep(2),因此幾乎沒有資源衝突和同步的問題。
可是,若是將sleep(2)去掉,會出現什麼結果呢?

咱們加入如下這個函數,在每次接受到包以後,比較接收到的包和原始發送的包。
static int is_pkt_ok(struct cs_packet_t *pkt_recv, struct cs_packet_t *pkt_sent)
{
        if (pkt_recv->cli_pid != pkt_sent->cli_pid)
                return 0;
        if ( strcmp(pkt_recv->action, pkt_sent->action) )
                return 0;
        char *p = pkt_recv->data;
        char *q = pkt_sent->data;
        while (*p != '\0')
     {
                if ( toupper(*p) != toupper(*q) )
                        return 0;
                p++;
                q++;
        }
        return 1;
}
結果會發現,若是沒有sleep(2),即,client發送包的速度很快,server處的處理就不對了。
緣由是:
1. 若是procA打開一個fifo來read, procB打開同一個fifo來寫,而且直接關掉,此時,procA將讀到0個字節。也就是說,writer端關閉proc,reader端會讀到0個字節(其實,這是個比較方便的特性,由於reader就有辦法知道遠方的writer是否關閉了pipe)。如下將有個示例程序證實這一點。

2. 在server端,對於每一個packet新建一個線程來處理,這就有可能出現如下狀況:一個線程處理完包,write回結果,但尚未關閉pipe時,另一個線程啓動,這個線程處理的是同一個client的包,因而它將與上一個線程共用一個FIFO來發送結果。因而就出現了衝突。client正等着接受第二個線程處理的結果,此時第一個線程關閉pipe的writer端的這個操做,也會使得client中的read再也不block而返回0字節。

其實,當多個線程訪問同一個資源而互相又不知道時,你基本上已經有一個bug了。
此處,FIFO是共有資源,兩個線程同時訪問,但又不加控制,結果不言而喻。

解決方案1:
client若是讀到0字節,說明這是一個無效的read,從新read。
結果:出現奇怪的現象。要麼server和client卡住,要麼client一直在讀pipe,可是每次讀的結果都是0個字節。此時,server程序的VSZ高的出奇,以下:
22728  0.1  0.3 3124528 3684 pts/1    S+   22:49   0:00 ./server
這個方案應該是解決了以上問題,出現奇怪現象的緣由應該是虛擬內存使用太高了。虛擬內存使用太高的緣由是,不斷的建立線程,在線程結束後,主線程沒有pthread_join它,致使該線程相關的資源不能被操做系統回收,一旦這樣的線程多了以後,虛擬內存就使用太高,致使程序沒法繼續運行了。

解決方案2:
server中添加pthread_join,等待該處理線程結束,回首資源。這樣確定沒有問題,可是,全部的處理就串行化了。可見這個server設計的有爛!
不過這個比解決方案1好點,由於它至少解決了問題。app

解決方案3:函數

利用thread attibutes,將處理packet的線程設置爲detached thread。這樣,當線程結束時,系統會自動回收線程的資源。這個方案比2要好一點。 oop

pthread_attr_t attr;this

pthread_t thread;操作系統

pthread_attr_init(&attr);.net

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);線程

pthread_create(&thread, &attr, &thread_function, NULL);設計

pthread_attr_destroy(&attr);

解決方案4:
對每一個packet進行線程的建立和銷燬,簡直有點蠢!
server能夠將packet放在一個FIFO中,而處理packet的線程不斷從FIFO中取出packet進行處理。
對FIFO的操做應該由某個函數進行操做,這個函數應該是re-entrant的,因此要注意資源的同步。
處理packet的線程數量,若是是一個,那麼在client較少的狀況下應該沒有問題。由於一個線程來處理幾個client的請求仍是說的過去的;若是是多個,那麼就要注意了,由於幾個處理線程擁有共同的資源,即,命名管道,須要進行仔細的處理,防止出現資源競爭。

如下代碼實現server中的packet queue。client代碼無需修改。
另外,目前代碼中有如下幾個地方須要改進。
1. client增長uer command interface,由此能夠寫腳本或者c程序來控制client的數量和行爲。
2. server中的packet要增長同步機制,避免競爭狀態出現。
3. server中增長線程池機制。
4. server中增長對client的管理線程,主要用於管理server->client的pipe,避免處理packet的線程反覆開啓和關閉同一個client的FIFO。
5. packet queue的長度要能夠根據目前負載動態改變。
(下一步先實現client中的user command interface)

/**
 * server_with_queue.c
 *
 * client/server using named pipes (FIFO)
 * server implementation with a packet queue
 **/

#include "common.h"
#include <pthread.h>

#define DEBUG_SERVER 1

#if DEBUG_SERVER
static void print_pkt(struct cs_packet_t *pkt)
{
    printf("==========packet=================\n");
    printf("client pid = %d \n", pkt->cli_pid);
    printf("client action = %s \n", pkt->action);
    printf("client data = %s \n", pkt->data);
    printf("==========end pkt================\n");
}

#else
static void print_pkt(struct cs_packet_t *pkt)
{
    return;
}

#endif

/**packet queue implementation; all operations on packet queue should be synchronized to avoid race condition*/
#define PKT_QUEUE_SIZE 32

/**
 * struct: pkt_queue_t -- FIFO queue of (struct cs_packet_t*)
 * @size: size of the queue, we use power of 2 size because it make modulus operation easier and faster
 * @head : head of this circular queue
 * @tail: tail of this circular queue
 * @pkt_queue: stores PKT_QUEUE_SIZE elements of type (struct cs_packet_t*)
 *
 **/
struct pkt_queue_t
{
    int size;
    int head;
    int tail;
    struct cs_packet_t **queue;
};

static struct pkt_queue_t pkt_queue; /* global variable for this server */

/**
 * function: init_pkt_queue
 *
 * the return value must be checked
 **/
static int init_pkt_queue(struct pkt_queue_t *pkt_queue)
{
    pkt_queue->size = PKT_QUEUE_SIZE;
    pkt_queue->head = 0;
    pkt_queue->tail = 0;
    pkt_queue->queue = (struct cs_packet_t **)malloc( pkt_queue->size*sizeof(struct cs_packet_t*) );
    if (pkt_queue == NULL)
    {
        perror("allocating packet queue failed");
        return -1;
    }
    for (int i=0; i<pkt_queue->size; i++)
    {
        pkt_queue->queue[i] = (struct cs_packet_t*)malloc( sizeof(struct cs_packet_t) );
        if (pkt_queue->queue[i] == NULL)
        {
            perror("allocating packet queue failed");
            return -1;
        }
    }
    return 0;
}

static int is_empty(struct pkt_queue_t *pkt_queue)
{
    return (pkt_queue->head == pkt_queue->tail);
}

static int is_full(struct pkt_queue_t *pkt_queue)
{
    int tmp = (pkt_queue->tail+1) & (pkt_queue->size - 1);
    return (pkt_queue->head == tmp);
}

static void clone_packet(struct cs_packet_t *pkt_dst, const struct cs_packet_t *pkt_src)
{
    pkt_dst->cli_pid = pkt_src->cli_pid;
    strcpy(pkt_dst->action, pkt_src->action);
    strcpy(pkt_dst->data, pkt_src->data);
}

/**
 * function: in_pkt_queue -- add an element into this FIFO
 * @return: 0 for success, -1 for fail
 *
 * we don't simply put the pointer into the queue, but copies the value of every field
 * the return value must be checked
 **/
static int in_pkt_queue(struct pkt_queue_t *pkt_queue, const struct cs_packet_t *pkt_to_add)
{
    if (is_full(pkt_queue))
    {
        return -1;
    }
    clone_packet(pkt_queue->queue[pkt_queue->tail], pkt_to_add);
    pkt_queue->tail = (pkt_queue->tail+1) & (pkt_queue->size - 1);
    return 0;
}

/**
 * function: out_pkt_queue  --- remove an element out of the FIFO
 * @pkt_dst: the target packet to copy the info to
 * @return: 0 for success, -1 for fail(this happens when the FIFO is empty)
 *
 * the return value must be checked
 **/
static int out_pkt_queue(struct pkt_queue_t *pkt_queue, struct cs_packet_t *pkt_dst)
{
    if (is_empty(pkt_queue))
    {
        return -1;
    }
    clone_packet(pkt_dst, pkt_queue->queue[pkt_queue->head]);
    pkt_queue->head = (pkt_queue->head+1)&(pkt_queue->size-1);
    return 0;
}

static int destroy_pkt_queue(struct pkt_queue_t *pkt_queue)
{
    for (int i=0; i<pkt_queue->size; i++)
    {
        free(pkt_queue->queue[i]);
    }
    free(pkt_queue->queue);
    return 0;
}

/**
 * function: thread_handle_requests
 *
 * thread which handles client requests, sends back the result to client through cli_pipe_%d pipe and then exits
 * client1 --(pkt1)--> |                      | --> pkt1 (thread1)
 * client2 --(pkt2)--> | --> packet queue --> | --> pkt2 (thread2)
 * client3 --(pkt3)--> |                      | --> pkt3 (thread3)
 *
 * N threads may loop to take the requests out of the queue and then handles it.
 * This avoids the overhead of thead creation and destruction.
 **/
void *thread_handle_requests(void *arg) /* arg is NULL */
{
    struct cs_packet_t *pkt = (struct cs_packet_t*)malloc(sizeof(struct cs_packet_t));
    if (pkt == NULL)
    {
        perror("Not Enough Memory In Heap!");
        exit(-1);
    }

    /* main loop in this thread, never exits voluntarily */
    for (;;)
    {
        /* loop to get a packet from packet queue */
        for (;;)
        {
            if ( out_pkt_queue(&pkt_queue, pkt) < 0 ) /* the queue is empty */
            {
                sleep(1);
            }
            else
            {
                break;
            }
        }
        print_pkt(pkt);

        char client_fifo[32];    /* name of this client's fifo, cli_fifo_%d */
        int client_fifo_fd;
        memset(client_fifo, 0, sizeof(client_fifo));
        sprintf(client_fifo, CLIENT_FIFO_NAME, pkt->cli_pid);
        client_fifo_fd = open(client_fifo, O_WRONLY);

        if (client_fifo_fd == -1)
        {
            perror("open client pipe failed");
            exit(-1);
        }

        if (!strcmp(pkt->action, "upcase"))
        {
            char *tmp_char_ptr = pkt->data;
            while (*tmp_char_ptr)
            {
                *tmp_char_ptr = toupper(*tmp_char_ptr);
                tmp_char_ptr++;
            }
            int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
            if (ret < 0)
                perror("write failed \n");
            else
                printf("<<server>> ---- write %d bytes \n", ret);
        }
        else if (!strcmp(pkt->action, "downcase"))
        {
            char *tmp_char_ptr = pkt->data;
            while (*tmp_char_ptr)
            {
                *tmp_char_ptr = tolower(*tmp_char_ptr);
                tmp_char_ptr++;
            }
            int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
            if (ret < 0)
                perror("write failed \n");
            else
                printf("<<server>> ---- write %d bytes \n", ret);
        }
        else
        {
            sprintf(pkt->data, "Action %s not supported", pkt->action);
            int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
            if (ret < 0)
                perror("write failed \n");
            else
                printf("<<server>> ---- write %d bytes \n", ret);
        }
        close(client_fifo_fd);
   
    }
    free(pkt);
    return NULL;
}


void main()
{
    struct cs_packet_t pkt;
    int server_fifo_fd;
    pthread_t th;
    void *result;
    /* make server fifo */
    mkfifo(SERVER_FIFO_NAME, 0777);
    server_fifo_fd = open(SERVER_FIFO_NAME, O_RDONLY);
    if (server_fifo_fd == -1)
    {
        perror("create server fifo failed");
        exit(-1);
    }

    /* init packet queue */
    int ret = init_pkt_queue(&pkt_queue);
    if (ret < 0)
    {
        perror("allocating pkt_queue failed");
        exit(-1);
    }
    /* create a thread to handle requests */
    ret = pthread_create(&th, NULL, thread_handle_requests, (void*)&pkt);
    if (ret < 0)
    {
        perror("create thread failed");
        exit(-1);
    }
    /* read from server fifo */
    for (;;)
    {
        int read_res = read(server_fifo_fd, &pkt, sizeof(pkt));
        if (read_res > 0)
        {
            /* add the packet into the packet queue [pkt_queue] */
            for (;;)
            {
                ret = in_pkt_queue(&pkt_queue, &pkt);
                if (ret < 0) /* the queue is full */
                {
                    sleep(1);
                }
                else
                {
                    break;
                }
            }
        } /* if (read_res > 0) */
    }      /* loop to listen to cs fifo */
    destroy_pkt_queue(&pkt_queue);
}



server

相關文章
相關標籤/搜索