續上篇IPC-Pipes
上篇的example3中,我用named pipe(FIFO)來實現client和server的通訊。
因爲client每次發完一個包後,都會sleep(2),因此幾乎沒有資源衝突和同步的問題。
可是,若是將sleep(2)去掉,會出現什麼結果呢?
咱們加入如下這個函數,在每次接受到包以後,比較接收到的包和原始發送的包。
static int is_pkt_ok(struct cs_packet_t *pkt_recv, struct cs_packet_t *pkt_sent)
{
if (pkt_recv->cli_pid != pkt_sent->cli_pid)
return 0;
if ( strcmp(pkt_recv->action, pkt_sent->action) )
return 0;
char *p = pkt_recv->data;
char *q = pkt_sent->data;
while (*p != '\0')
{
if ( toupper(*p) != toupper(*q) )
return 0;
p++;
q++;
}
return 1;
}
結果會發現,若是沒有sleep(2),即,client發送包的速度很快,server處的處理就不對了。
緣由是:
1. 若是procA打開一個fifo來read, procB打開同一個fifo來寫,而且直接關掉,此時,procA將讀到0個字節。也就是說,writer端關閉proc,reader端會讀到0個字節(其實,這是個比較方便的特性,由於reader就有辦法知道遠方的writer是否關閉了pipe)。如下將有個示例程序證實這一點。
2. 在server端,對於每一個packet新建一個線程來處理,這就有可能出現如下狀況:一個線程處理完包,write回結果,但尚未關閉pipe時,另一個線程啓動,這個線程處理的是同一個client的包,因而它將與上一個線程共用一個FIFO來發送結果。因而就出現了衝突。client正等着接受第二個線程處理的結果,此時第一個線程關閉pipe的writer端的這個操做,也會使得client中的read再也不block而返回0字節。
其實,當多個線程訪問同一個資源而互相又不知道時,你基本上已經有一個bug了。
此處,FIFO是共有資源,兩個線程同時訪問,但又不加控制,結果不言而喻。
解決方案1:
client若是讀到0字節,說明這是一個無效的read,從新read。
結果:出現奇怪的現象。要麼server和client卡住,要麼client一直在讀pipe,可是每次讀的結果都是0個字節。此時,server程序的VSZ高的出奇,以下:
22728 0.1 0.3 3124528 3684 pts/1 S+ 22:49 0:00 ./server
這個方案應該是解決了以上問題,出現奇怪現象的緣由應該是虛擬內存使用太高了。虛擬內存使用太高的緣由是,不斷的建立線程,在線程結束後,主線程沒有pthread_join它,致使該線程相關的資源不能被操做系統回收,一旦這樣的線程多了以後,虛擬內存就使用太高,致使程序沒法繼續運行了。
解決方案2:
server中添加pthread_join,等待該處理線程結束,回首資源。這樣確定沒有問題,可是,全部的處理就串行化了。可見這個server設計的有爛!
不過這個比解決方案1好點,由於它至少解決了問題。app
解決方案3:函數
利用thread attibutes,將處理packet的線程設置爲detached thread。這樣,當線程結束時,系統會自動回收線程的資源。這個方案比2要好一點。 oop
pthread_attr_t attr;this
pthread_t thread;操作系統
pthread_attr_init(&attr);.net
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);線程
pthread_create(&thread, &attr, &thread_function, NULL);設計
pthread_attr_destroy(&attr);
解決方案4:
對每一個packet進行線程的建立和銷燬,簡直有點蠢!
server能夠將packet放在一個FIFO中,而處理packet的線程不斷從FIFO中取出packet進行處理。
對FIFO的操做應該由某個函數進行操做,這個函數應該是re-entrant的,因此要注意資源的同步。
處理packet的線程數量,若是是一個,那麼在client較少的狀況下應該沒有問題。由於一個線程來處理幾個client的請求仍是說的過去的;若是是多個,那麼就要注意了,由於幾個處理線程擁有共同的資源,即,命名管道,須要進行仔細的處理,防止出現資源競爭。
如下代碼實現server中的packet queue。client代碼無需修改。
另外,目前代碼中有如下幾個地方須要改進。
1. client增長uer command interface,由此能夠寫腳本或者c程序來控制client的數量和行爲。
2. server中的packet要增長同步機制,避免競爭狀態出現。
3. server中增長線程池機制。
4. server中增長對client的管理線程,主要用於管理server->client的pipe,避免處理packet的線程反覆開啓和關閉同一個client的FIFO。
5. packet queue的長度要能夠根據目前負載動態改變。
(下一步先實現client中的user command interface)
/**
* server_with_queue.c
*
* client/server using named pipes (FIFO)
* server implementation with a packet queue
**/
#include "common.h"
#include <pthread.h>
#define DEBUG_SERVER 1
#if DEBUG_SERVER
static void print_pkt(struct cs_packet_t *pkt)
{
printf("==========packet=================\n");
printf("client pid = %d \n", pkt->cli_pid);
printf("client action = %s \n", pkt->action);
printf("client data = %s \n", pkt->data);
printf("==========end pkt================\n");
}
#else
static void print_pkt(struct cs_packet_t *pkt)
{
return;
}
#endif
/**packet queue implementation; all operations on packet queue should be synchronized to avoid race condition*/
#define PKT_QUEUE_SIZE 32
/**
* struct: pkt_queue_t -- FIFO queue of (struct cs_packet_t*)
* @size: size of the queue, we use power of 2 size because it make modulus operation easier and faster
* @head : head of this circular queue
* @tail: tail of this circular queue
* @pkt_queue: stores PKT_QUEUE_SIZE elements of type (struct cs_packet_t*)
*
**/
struct pkt_queue_t
{
int size;
int head;
int tail;
struct cs_packet_t **queue;
};
static struct pkt_queue_t pkt_queue; /* global variable for this server */
/**
* function: init_pkt_queue
*
* the return value must be checked
**/
static int init_pkt_queue(struct pkt_queue_t *pkt_queue)
{
pkt_queue->size = PKT_QUEUE_SIZE;
pkt_queue->head = 0;
pkt_queue->tail = 0;
pkt_queue->queue = (struct cs_packet_t **)malloc( pkt_queue->size*sizeof(struct cs_packet_t*) );
if (pkt_queue == NULL)
{
perror("allocating packet queue failed");
return -1;
}
for (int i=0; i<pkt_queue->size; i++)
{
pkt_queue->queue[i] = (struct cs_packet_t*)malloc( sizeof(struct cs_packet_t) );
if (pkt_queue->queue[i] == NULL)
{
perror("allocating packet queue failed");
return -1;
}
}
return 0;
}
static int is_empty(struct pkt_queue_t *pkt_queue)
{
return (pkt_queue->head == pkt_queue->tail);
}
static int is_full(struct pkt_queue_t *pkt_queue)
{
int tmp = (pkt_queue->tail+1) & (pkt_queue->size - 1);
return (pkt_queue->head == tmp);
}
static void clone_packet(struct cs_packet_t *pkt_dst, const struct cs_packet_t *pkt_src)
{
pkt_dst->cli_pid = pkt_src->cli_pid;
strcpy(pkt_dst->action, pkt_src->action);
strcpy(pkt_dst->data, pkt_src->data);
}
/**
* function: in_pkt_queue -- add an element into this FIFO
* @return: 0 for success, -1 for fail
*
* we don't simply put the pointer into the queue, but copies the value of every field
* the return value must be checked
**/
static int in_pkt_queue(struct pkt_queue_t *pkt_queue, const struct cs_packet_t *pkt_to_add)
{
if (is_full(pkt_queue))
{
return -1;
}
clone_packet(pkt_queue->queue[pkt_queue->tail], pkt_to_add);
pkt_queue->tail = (pkt_queue->tail+1) & (pkt_queue->size - 1);
return 0;
}
/**
* function: out_pkt_queue --- remove an element out of the FIFO
* @pkt_dst: the target packet to copy the info to
* @return: 0 for success, -1 for fail(this happens when the FIFO is empty)
*
* the return value must be checked
**/
static int out_pkt_queue(struct pkt_queue_t *pkt_queue, struct cs_packet_t *pkt_dst)
{
if (is_empty(pkt_queue))
{
return -1;
}
clone_packet(pkt_dst, pkt_queue->queue[pkt_queue->head]);
pkt_queue->head = (pkt_queue->head+1)&(pkt_queue->size-1);
return 0;
}
static int destroy_pkt_queue(struct pkt_queue_t *pkt_queue)
{
for (int i=0; i<pkt_queue->size; i++)
{
free(pkt_queue->queue[i]);
}
free(pkt_queue->queue);
return 0;
}
/**
* function: thread_handle_requests
*
* thread which handles client requests, sends back the result to client through cli_pipe_%d pipe and then exits
* client1 --(pkt1)--> | | --> pkt1 (thread1)
* client2 --(pkt2)--> | --> packet queue --> | --> pkt2 (thread2)
* client3 --(pkt3)--> | | --> pkt3 (thread3)
*
* N threads may loop to take the requests out of the queue and then handles it.
* This avoids the overhead of thead creation and destruction.
**/
void *thread_handle_requests(void *arg) /* arg is NULL */
{
struct cs_packet_t *pkt = (struct cs_packet_t*)malloc(sizeof(struct cs_packet_t));
if (pkt == NULL)
{
perror("Not Enough Memory In Heap!");
exit(-1);
}
/* main loop in this thread, never exits voluntarily */
for (;;)
{
/* loop to get a packet from packet queue */
for (;;)
{
if ( out_pkt_queue(&pkt_queue, pkt) < 0 ) /* the queue is empty */
{
sleep(1);
}
else
{
break;
}
}
print_pkt(pkt);
char client_fifo[32]; /* name of this client's fifo, cli_fifo_%d */
int client_fifo_fd;
memset(client_fifo, 0, sizeof(client_fifo));
sprintf(client_fifo, CLIENT_FIFO_NAME, pkt->cli_pid);
client_fifo_fd = open(client_fifo, O_WRONLY);
if (client_fifo_fd == -1)
{
perror("open client pipe failed");
exit(-1);
}
if (!strcmp(pkt->action, "upcase"))
{
char *tmp_char_ptr = pkt->data;
while (*tmp_char_ptr)
{
*tmp_char_ptr = toupper(*tmp_char_ptr);
tmp_char_ptr++;
}
int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
if (ret < 0)
perror("write failed \n");
else
printf("<<server>> ---- write %d bytes \n", ret);
}
else if (!strcmp(pkt->action, "downcase"))
{
char *tmp_char_ptr = pkt->data;
while (*tmp_char_ptr)
{
*tmp_char_ptr = tolower(*tmp_char_ptr);
tmp_char_ptr++;
}
int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
if (ret < 0)
perror("write failed \n");
else
printf("<<server>> ---- write %d bytes \n", ret);
}
else
{
sprintf(pkt->data, "Action %s not supported", pkt->action);
int ret = write(client_fifo_fd, pkt, sizeof(struct cs_packet_t));
if (ret < 0)
perror("write failed \n");
else
printf("<<server>> ---- write %d bytes \n", ret);
}
close(client_fifo_fd);
}
free(pkt);
return NULL;
}
void main()
{
struct cs_packet_t pkt;
int server_fifo_fd;
pthread_t th;
void *result;
/* make server fifo */
mkfifo(SERVER_FIFO_NAME, 0777);
server_fifo_fd = open(SERVER_FIFO_NAME, O_RDONLY);
if (server_fifo_fd == -1)
{
perror("create server fifo failed");
exit(-1);
}
/* init packet queue */
int ret = init_pkt_queue(&pkt_queue);
if (ret < 0)
{
perror("allocating pkt_queue failed");
exit(-1);
}
/* create a thread to handle requests */
ret = pthread_create(&th, NULL, thread_handle_requests, (void*)&pkt);
if (ret < 0)
{
perror("create thread failed");
exit(-1);
}
/* read from server fifo */
for (;;)
{
int read_res = read(server_fifo_fd, &pkt, sizeof(pkt));
if (read_res > 0)
{
/* add the packet into the packet queue [pkt_queue] */
for (;;)
{
ret = in_pkt_queue(&pkt_queue, &pkt);
if (ret < 0) /* the queue is full */
{
sleep(1);
}
else
{
break;
}
}
} /* if (read_res > 0) */
} /* loop to listen to cs fifo */
destroy_pkt_queue(&pkt_queue);
}
server