【LINUX/UNIX網絡編程】之使用消息隊列,信號量和命名管道實現的多進程服務器(多人羣聊系統)

RT,使用消息隊列,信號量和命名管道實現的多人羣聊系統。編程

本學期Linux、unix網絡編程的第三個做業。ubuntu

先上實驗要求:centos

實驗三  多進程服務器服務器

【實驗目的】網絡

一、熟練掌握進程的建立與終止方法;架構

二、熟練掌握進程間通訊方法;併發

二、應用套接字函數完成多進程服務器,實現服務器與客戶端的信息交互。socket

【實驗學時】函數

    4學時工具

【實驗內容】

經過一個服務器實現最多5個客戶之間的信息羣發。

服務器顯示客戶的登陸與退出;

客戶鏈接後首先發送客戶名稱,以後發送羣聊信息;

客戶輸入bye表明退出,在線客戶能顯示其餘客戶的登陸於退出。

 

任務分析:

 

實現提示:

一、服務器端:

服務器進程稱之爲主進程,主進程建立一個轉發子進程和最多5個通訊子進程。

主進程與轉發子進程之間:

        信號量(初值5,主進程接受一個客戶鏈接後執行P操做判斷是否超過5,轉發子進程有一個客戶退出後執行V操做,併發消息隊列標識符)

        命名管道SERVER(轉發子進程將可用的消息隊列標識符寫入管道,主進程從管道中讀取消息隊列標識符)

轉發子進程與通訊子進程之間:

        命名管道CLIENT(通訊子進程向命名管道寫入客戶端發來的消息,轉發子進程從管道中讀取消息併發送給對應的客戶端)

        消息隊列(轉發子進程將客戶發來的信息經過消息隊列發送給每一個通訊子進程)

(1)主進程:

從轉發子進程獲取一個可用的消息隊列標識符;

接收客戶鏈接請求,若是鏈接數超過最大鏈接數,向客戶發送退出標誌,不然發送OK標誌;

每接受一個鏈接,建立一個通訊子進程並將鏈接socket、消息隊列標識符、客戶地址傳遞給通訊子進程。

(2)通訊子進程:

建立一個子進程負責從消息隊列中讀取消息,發送給客戶。

通訊子進程負責接收客戶發來信息,經過命名管道CLIENT發送給轉發子進程;

若信息爲用戶名,附帶消息隊列、客戶地址發送給轉發子進程;

若信息爲退出,終止子進程,程序結束

(3)轉發子進程:

建立5個消息隊列;

維護客戶信息表:消息隊列、客戶名、客戶IP、客戶端口、狀態。

從命名管道CLIENT中讀取通訊子進程發來的消息,消息類型爲:用戶名、退出及通常信息;

若爲用戶名,依據消息隊列在更新客戶信息表,狀態爲可用;

若爲通常信息,將信息轉換後寫入可用客戶的消息隊列,等待其餘通訊子進程讀取;

若爲退出,在客戶信息表中狀態設爲不可用,執行信號量V操做,並將可用客戶的消息隊列標識符寫入到命名管道SERVER;

二、客戶端:

根據用戶從終端輸入的服務器IP地址及端口號鏈接到相應的服務器;

鏈接成功後,先發送客戶名稱;

建立一個子進程負責接收服務器發來的信息,並顯示;

主進程循環從終端輸入信息,並將信息發送給服務器;

當發送給服務器爲bye後,關閉子進程,程序退出。

 

架構看起來很複雜,咱們能夠繪製一下流程圖方便理清思路。


 

在word裏面截圖不是很清晰啊。。。

開始寫代碼吧:首先clientmsg.h,它定義了一些消息的操做符(OP)和CLIENTMSG這個結構體(用於服務器和客戶端之間傳遞消息)

 1 //CLIENTMSG between server and client
 2 #ifndef _clientmsg
 3 #define _clientmsg
 4 
 5 //USER MSG EXIT for OP of CLIENTMSG
 6 #define EXIT -1
 7 #define USER 1
 8 #define MSG 2
 9 #define OK 3
10 
11 #ifndef CMSGLEN
12 #define CMSGLEN 100
13 #endif
14 
15 struct CLIENTMSG{
16     int OP;
17     char username[20];
18     char buf[CMSGLEN];
19 };
20 
21 #endif

而後實現一下servermsg.h,用於服務器內部的轉發子進程和通訊子進程之間的消息傳遞。

 1 //SERVERMSG for communicate to translate
 2 //MESSAGE for translate to communicate
 3 #ifndef _servermsg
 4 #define _servermsg
 5 
 6 #include <netinet/in.h>
 7 #include "clientmsg.h"
 8 
 9 
10 #ifndef CMSGLEN
11 #define CMSGLEN 100
12 #endif
13 
14 
15 struct SERVERMSG{
16     int OP;
17     char username[20];
18     char buf[CMSGLEN];
19     struct sockaddr_in client;
20     int stat;
21     int qid;
22 };
23 
24 struct MESSAGE{
25     long msgtype;
26     struct SERVERMSG msg;
27 };
28 
29 #endif

因爲須要操做信號量,因此將一些信號量的操做作成函數

semaphore.h

 1 #ifndef _semaphore
 2 #define _semaphore
 3 
 4 union semun
 5 {
 6     int val; 
 7     struct semid_ds *buf;
 8     unsigned short *array; 
 9 };
10 
11 int CreateSem(key_t key,int value);
12 int Sem_P(int semid);
13 int Sem_V(int semid);
14 int GetvalueSem(int semid);
15 void DestroySem(int semid);
16 
17 
18 #endif

對函數的實現:semaphore.c

 1 #include <stdlib.h>
 2 #include <fcntl.h>
 3 #include <sys/sem.h>
 4 #include "semaphore.h"
 5 
 6 int CreateSem(key_t key,int value)
 7 {
 8     union semun sem;
 9     int semid;
10     sem.val=value;
11     semid=semget(key,1,IPC_CREAT);
12     if (semid==-1){
13             perror("semget error");    exit(1);
14     }
15     semctl(semid,0,SETVAL,sem);
16     return semid;
17 }
18 
19 int Sem_P(int semid)
20 {
21     struct sembuf sops={0,-1,IPC_NOWAIT};
22     return (semop(semid,&sops,1));
23 }
24 
25 int Sem_V(int semid)
26 {
27     struct sembuf sops={0,+1,IPC_NOWAIT};
28     return (semop(semid,&sops,1));
29 }
30 
31 int GetvalueSem(int semid)
32 {
33     union semun sem;
34     return semctl(semid,0,GETVAL,sem);
35 }
36 void DestroySem(int semid)
37 {
38     union semun sem;
39     sem.val=0;
40 
41     semctl(semid,0,IPC_RMID,sem);
42 }

 

接下來是很是重要的服務器端實現(裏面有不少調試信息,比較懶沒有刪掉,直接在裏面註釋掉了。)

server.c

  1 #include <stdio.h>
  2 #include <string.h>
  3 #include <sys/socket.h>
  4 #include <netinet/in.h>
  5 #include <stdlib.h>
  6 #include <sys/types.h>
  7 #include <sys/wait.h>
  8 #include <sys/stat.h>
  9 #include <unistd.h>
 10 #include <fcntl.h>
 11 #include <sys/ipc.h>
 12 #include "semaphore.h"
 13 #include "servermsg.h"
 14 
 15 
 16 void trans_process(int semid);
 17 void communicate_process(int connetfd,int qid,struct sockaddr_in client);
 18 
 19 int main(){
 20 
 21     struct sockaddr_in server;
 22     struct sockaddr_in client;
 23     int listenfd,connetfd;
 24     char ip[20];
 25     int port;
 26     int addrlen;
 27     struct CLIENTMSG clientMsg;
 28     int ret,status;
 29     /*---------------------socket-------------------*/
 30     if((listenfd = socket(AF_INET,SOCK_STREAM,0))== -1){
 31         perror("socket() error\n");
 32         exit(1);
 33     }
 34 
 35     /*----------------------IO-----------------------*/
 36     printf("Please input the ip:\n");
 37     scanf("%s",ip);
 38     printf("Please input the port:\n");
 39     scanf("%d",&port);
 40 
 41     /*---------------------bind----------------------*/
 42     bzero(&server,sizeof(server));
 43     server.sin_family = AF_INET;
 44     server.sin_port = htons(port);
 45     server.sin_addr.s_addr = inet_addr(ip);
 46     if(bind(listenfd,(struct sockaddr *)&server,sizeof(server))== -1){
 47         perror("bind() error\n");
 48         exit(1);
 49     }
 50 
 51     /*----------------------listen-------------------*/
 52     if (listen(listenfd,5)== -1){
 53         perror("listen() error\n");
 54         exit(1);
 55     }
 56 
 57     //建立命名管道
 58     unlink("SERVER");
 59     mkfifo("SERVER",O_CREAT);
 60     int rd = open("SERVER",O_RDONLY|O_NONBLOCK);
 61     int semid;
 62     key_t k =  ftok(".",'b');
 63     semid = CreateSem(k,5);
 64     pid_t pid_1,pid_2;
 65     pid_1 = fork();
 66     if(pid_1 == 0){
 67         trans_process(semid);
 68         exit(0);
 69     }
 70     else if(pid_1 > 0){
 71         while(1){
 72             addrlen = sizeof(client);
 73             if((connetfd = accept(listenfd,(struct sockaddr *)&client,&addrlen))== -1){
 74             perror("accept() error\n");
 75             exit(1);
 76             }
 77             ret = Sem_P(semid);
 78             if(ret == 0){
 79                 int qid;
 80                 read(rd,&qid,sizeof(qid));
 81                 //printf("qid1:%d\n",qid );
 82                 pid_2 = fork();
 83                 if (pid_2 > 0){
 84                     close(connetfd);
 85                     waitpid(pid_2,&status,WNOHANG);
 86                     continue;
 87                 }
 88                 else if(pid_2 == 0){
 89                     communicate_process(connetfd,qid,client);
 90                     exit(0);
 91                 }
 92                 else {
 93                     perror("the second fork error\n");
 94                 }
 95             }
 96             else {
 97                 clientMsg.OP = EXIT;
 98                 send(connetfd,&clientMsg,sizeof(clientMsg),0);
 99                 close(connetfd);
100             }
101             waitpid(pid_1,&status,WNOHANG);
102 
103         }
104     }
105     else {
106         perror("first time fork error\n");
107     }
108     /*----------------------close-------------------*/
109     close(connetfd);
110     close(listenfd);
111 
112     return 0;
113 }
114 
115 
116 /*----------------------------函數實現區----------------------------*/
117 void trans_process(int semid){
118     struct SERVERMSG ent[5];
119     struct MESSAGE sendMsg;
120     struct SERVERMSG msg;
121     int i;
122     for(i=0;i<5;i++){
123         ent[i].stat = 0;
124     }
125     int wfd = open("SERVER",O_WRONLY|O_NONBLOCK);
126     for(i=0;i<5;i++){
127         key_t key = ftok(".",(char)i+102);
128         ent[i].qid = msgget(key,IPC_CREAT);
129         write(wfd,&ent[i].qid,sizeof(ent[i].qid));
130     }
131     unlink("CLIENT");
132     mkfifo("CLIENT",O_CREAT);
133     int rfd = open("CLIENT",O_RDONLY|O_NONBLOCK);
134     int len;
135     while(1){
136         bzero(&msg,sizeof(msg));
137         len = read(rfd,&msg,sizeof(msg));
138         //printf(" %d,%s ,%s\n",msg.OP,msg.username,msg.buf );
139         //sleep(3);
140         if(len > 0){
141             if(msg.OP == USER){
142                 for(i=0;i<5;i++){
143                     if(ent[i].qid == msg.qid){
144                         bcopy(msg.username,ent[i].username,strlen(msg.username));
145                         ent[i].client = msg.client;
146                         ent[i].stat = 1;
147                         break;
148                     }
149                 }
150              }
151             else if(msg.OP == EXIT){
152                 for(i=0;i<5;i++){
153                     if(ent[i].qid == msg.qid){
154                         ent[i].stat = 0;
155                         write(wfd,&ent[i].qid,sizeof(ent[i].qid));
156                         Sem_V(semid);
157                         break;
158                     }
159                 }
160              }
161              //bzero(&sendMsg,sizeof(sendMsg));
162              sendMsg.msg = msg;
163              for(i=0;i<5;i++){
164                  if(ent[i].stat == 1){
165                      printf("stat 1...\n");
166                      int m_len = sizeof(msg);
167                      int sta=msgsnd(ent[i].qid,&sendMsg,len,0);
168                      //printf("flag:%d\n",sta );
169                  }
170              }
171         }
172         else {
173             continue;
174         }
175     } 
176 
177 }
178 
179 void communicate_process(int connetfd,int qid,struct sockaddr_in client){
180     struct CLIENTMSG sendMsg;
181     struct CLIENTMSG recvMsg;
182     struct MESSAGE server_Msg;
183     struct SERVERMSG client_sndMsg;
184     struct SERVERMSG msg;
185     int status;
186     int wfd = open("CLIENT",O_WRONLY|O_NONBLOCK);
187     pid_t pid;
188     pid = fork();
189     if(pid < 0){
190         perror("communicate_process fork error\n");
191     }
192     else if (pid == 0){
193         bzero(&sendMsg,sizeof(sendMsg));
194         sendMsg.OP = OK;
195         send(connetfd,&sendMsg,sizeof(sendMsg),0);
196         while(1){
197             int m_len = sizeof(msg);
198             bzero(&server_Msg,sizeof(server_Msg));
199             int sta=msgrcv(qid,&server_Msg,m_len,0,0);
200             //printf("flag:%d\n",sta );
201             //printf("send..%d,%s,%s\n",server_Msg.msg.OP,server_Msg.msg.username,server_Msg.msg.buf );
202             bzero(&sendMsg,sizeof(sendMsg));
203             bcopy(server_Msg.msg.username,sendMsg.username,strlen(server_Msg.msg.username));
204             sendMsg.OP = server_Msg.msg.OP;
205             bcopy(server_Msg.msg.buf,sendMsg.buf,strlen(server_Msg.msg.buf));
206             //printf("send..%d,%s,%s\n",sendMsg.OP,sendMsg.username,sendMsg.buf );
207             send(connetfd,&sendMsg,sizeof(sendMsg),0);
208         }
209     }
210     else{
211         while(1){
212             bzero(&recvMsg,sizeof(recvMsg));
213             int len =recv(connetfd,&recvMsg,sizeof(recvMsg),0);
214             if(len > 0){
215                 if(recvMsg.OP == USER){
216                     printf("user %s login from ip:%s,port:%d\n",recvMsg.username,inet_ntoa(client.sin_addr),ntohs(client.sin_port) );
217                     client_sndMsg.OP = USER;
218                 }
219                 else if(recvMsg.OP == EXIT){
220                     printf("user %s is logout\n",recvMsg.username );
221                     client_sndMsg.OP = EXIT;
222                     write(wfd,&client_sndMsg,sizeof(client_sndMsg));
223                     break;
224                 }
225                 else if(recvMsg.OP == MSG){
226                     client_sndMsg.OP = MSG;
227                 }
228                 bzero(&client_sndMsg,sizeof(client_sndMsg));
229                 bcopy(recvMsg.username,client_sndMsg.username,strlen(recvMsg.username));
230                 bcopy(recvMsg.buf,client_sndMsg.buf,strlen(recvMsg.buf));
231                 client_sndMsg.client = client;
232                 //printf("qid2:%d\n",qid );
233                 client_sndMsg.qid = qid;
234                 client_sndMsg.OP = recvMsg.OP;
235                 write(wfd,&client_sndMsg,sizeof(client_sndMsg));
236                 
237             }
238             else{
239                 continue;
240             }
241         }
242         kill(pid,SIGKILL);
243         waitpid(pid,&status,WNOHANG);
244         close(wfd);
245         close(connetfd);
246     }
247 }

 

寫出了服務端,就能夠很是容易的寫出客戶端了。

client.c

 1 #include <stdio.h>
 2 #include <string.h>
 3 #include <sys/socket.h>
 4 #include <netinet/in.h>
 5 #include <stdlib.h>
 6 #include <sys/types.h>
 7 #include <sys/wait.h>
 8 #include <signal.h>
 9 #include <unistd.h>
10 #include "clientmsg.h"
11 
12 int main(){
13     int sockfd;
14     char ip[20];
15     int port;
16     int status;
17     pid_t pid;
18     struct sockaddr_in server;
19     struct CLIENTMSG clientMsg;
20 
21     /*---------------------socket---------------------*/
22     if((sockfd = socket(AF_INET,SOCK_STREAM,0))== -1){
23         perror("socket error\n");
24         exit(1);
25     }
26 
27     /*---------------------connect--------------------*/
28     printf("Please input the ip:\n");
29     scanf("%s",ip);
30     printf("Please input the port:\n");
31     scanf("%d",&port);
32     bzero(&server,sizeof(server));
33     server.sin_family = AF_INET;
34     server.sin_port = htons(port);
35     inet_aton(ip,&server.sin_addr);
36     if(connect(sockfd,(struct sockaddr *)&server,sizeof(server))== -1){
37         perror("connect() error\n");
38         exit(1);
39     }
40     recv(sockfd,&clientMsg,sizeof(clientMsg),0);
41     if(clientMsg.OP == OK){
42         int len;
43         pid = fork();
44         if(pid == 0){
45             while(1){
46                 bzero(&clientMsg,sizeof(clientMsg));
47                 len =recv(sockfd,&clientMsg,sizeof(clientMsg),0);
48                 if(len > 0){
49                     if(clientMsg.OP ==USER){
50                         printf("the user %s is login.\n",clientMsg.username );
51                     }
52                     else if(clientMsg.OP == EXIT){
53                         printf("the user %s is logout.\n",clientMsg.username);
54                     }
55                     else if(clientMsg.OP == MSG){
56                         printf("%s: %s\n",clientMsg.username,clientMsg.buf );
57                     }
58                 }    
59             }
60             exit(EXIT_SUCCESS); 
61         }
62         else if(pid > 0){
63             printf("Please input the username:\n");
64             scanf("%s",clientMsg.username);
65             clientMsg.OP = USER;
66             send(sockfd,&clientMsg,sizeof(clientMsg),0);
67             while(1){
68                 clientMsg.OP = MSG;
69                 scanf("%s",clientMsg.buf);
70                 if(strcmp("bye",clientMsg.buf) == 0){
71                     clientMsg.OP = EXIT;
72                     send(sockfd,&clientMsg,sizeof(clientMsg),0);
73                     break;
74                 }
75                 send(sockfd,&clientMsg,sizeof(clientMsg),0);
76 
77             }
78             kill(pid,SIGKILL);
79             waitpid(pid,&status,WNOHANG);
80         }
81         else{
82             perror("fork error!\n");
83         }
84     }
85     else{
86         printf("以達到最大鏈接數!\n");
87     }
88     /*------------------------close--------------------------*/
89     close(sockfd);
90 
91     return 0;
92 }

最後是makefile:

main:server.o client.o semaphore.o
    gcc server.o semaphore.o -oserver
    gcc client.o -oclient
server.o:server.c semaphore.h clientmsg.h servermsg.h
    gcc -c server.c
client.o:client.c clientmsg.h
    gcc -c client.c
semaphore.o:semaphore.h semaphore.c
    gcc -c semaphore.c
clean:
    rm -rf *.o

 

 

下面上一下演示過程:(測試環境,Red Hat Enterprise Linux 6 + centos系Linux,ubuntu下可能會有些問題。)

首先先把服務端啓動開來,爲了方便測試,這裏直接使用的是127.0.0.1的localhost。

 

而後啓動兩個客戶端用來測試,在用戶登陸的時候客戶端會有消息提醒。服務端會有日誌打印輸出客戶端的名字和登陸ip、端口。

客戶能夠發送消息了,如圖發送與接收均正常。能夠同時啓動<=5個客戶端進行羣聊,這裏爲了簡單演示只是啓動了2個。(修改信號量代碼能夠實現n多個客戶的同時登錄):

 

輸入bye之後便可退出聊天並下線。當有客戶下線的時候,在線的客戶端會收到下線提醒,客戶端會有日誌打印輸出。

 

 

這個實驗內容前先後後花了我2天才寫完,剛開始沒有弄清楚這一整套的工做機制與流程,寫起來非常吃力,程序就是各類調不通。原本都想放棄了,可是後來仍是咬咬牙堅持了一下來,飯要一口一口吃,程序要一點一點的寫,萬事不能操之過急,寫代碼必定要心平氣和,頭腦清晰。因爲gdb調試工具用的不是很熟練,只能在程序裏面一段一段的print變量來DEBUG,非常辛苦啊。

相關文章
相關標籤/搜索