RT,使用消息隊列,信號量和命名管道實現的多人羣聊系統。編程
本學期Linux、unix網絡編程的第三個做業。ubuntu
先上實驗要求:centos
實驗三 多進程服務器服務器
【實驗目的】網絡
一、熟練掌握進程的建立與終止方法;架構
二、熟練掌握進程間通訊方法;併發
二、應用套接字函數完成多進程服務器,實現服務器與客戶端的信息交互。socket
【實驗學時】函數
4學時工具
【實驗內容】
經過一個服務器實現最多5個客戶之間的信息羣發。
服務器顯示客戶的登陸與退出;
客戶鏈接後首先發送客戶名稱,以後發送羣聊信息;
客戶輸入bye表明退出,在線客戶能顯示其餘客戶的登陸於退出。
任務分析:
實現提示:
一、服務器端:
服務器進程稱之爲主進程,主進程建立一個轉發子進程和最多5個通訊子進程。
主進程與轉發子進程之間:
信號量(初值5,主進程接受一個客戶鏈接後執行P操做判斷是否超過5,轉發子進程有一個客戶退出後執行V操做,併發消息隊列標識符)
命名管道SERVER(轉發子進程將可用的消息隊列標識符寫入管道,主進程從管道中讀取消息隊列標識符)
轉發子進程與通訊子進程之間:
命名管道CLIENT(通訊子進程向命名管道寫入客戶端發來的消息,轉發子進程從管道中讀取消息併發送給對應的客戶端)
消息隊列(轉發子進程將客戶發來的信息經過消息隊列發送給每一個通訊子進程)
(1)主進程:
從轉發子進程獲取一個可用的消息隊列標識符;
接收客戶鏈接請求,若是鏈接數超過最大鏈接數,向客戶發送退出標誌,不然發送OK標誌;
每接受一個鏈接,建立一個通訊子進程並將鏈接socket、消息隊列標識符、客戶地址傳遞給通訊子進程。
(2)通訊子進程:
建立一個子進程負責從消息隊列中讀取消息,發送給客戶。
通訊子進程負責接收客戶發來信息,經過命名管道CLIENT發送給轉發子進程;
若信息爲用戶名,附帶消息隊列、客戶地址發送給轉發子進程;
若信息爲退出,終止子進程,程序結束
(3)轉發子進程:
建立5個消息隊列;
維護客戶信息表:消息隊列、客戶名、客戶IP、客戶端口、狀態。
從命名管道CLIENT中讀取通訊子進程發來的消息,消息類型爲:用戶名、退出及通常信息;
若爲用戶名,依據消息隊列在更新客戶信息表,狀態爲可用;
若爲通常信息,將信息轉換後寫入可用客戶的消息隊列,等待其餘通訊子進程讀取;
若爲退出,在客戶信息表中狀態設爲不可用,執行信號量V操做,並將可用客戶的消息隊列標識符寫入到命名管道SERVER;
二、客戶端:
根據用戶從終端輸入的服務器IP地址及端口號鏈接到相應的服務器;
鏈接成功後,先發送客戶名稱;
建立一個子進程負責接收服務器發來的信息,並顯示;
主進程循環從終端輸入信息,並將信息發送給服務器;
當發送給服務器爲bye後,關閉子進程,程序退出。
架構看起來很複雜,咱們能夠繪製一下流程圖方便理清思路。
在word裏面截圖不是很清晰啊。。。
開始寫代碼吧:首先clientmsg.h,它定義了一些消息的操做符(OP)和CLIENTMSG這個結構體(用於服務器和客戶端之間傳遞消息)
1 //CLIENTMSG between server and client 2 #ifndef _clientmsg 3 #define _clientmsg 4 5 //USER MSG EXIT for OP of CLIENTMSG 6 #define EXIT -1 7 #define USER 1 8 #define MSG 2 9 #define OK 3 10 11 #ifndef CMSGLEN 12 #define CMSGLEN 100 13 #endif 14 15 struct CLIENTMSG{ 16 int OP; 17 char username[20]; 18 char buf[CMSGLEN]; 19 }; 20 21 #endif
而後實現一下servermsg.h,用於服務器內部的轉發子進程和通訊子進程之間的消息傳遞。
1 //SERVERMSG for communicate to translate 2 //MESSAGE for translate to communicate 3 #ifndef _servermsg 4 #define _servermsg 5 6 #include <netinet/in.h> 7 #include "clientmsg.h" 8 9 10 #ifndef CMSGLEN 11 #define CMSGLEN 100 12 #endif 13 14 15 struct SERVERMSG{ 16 int OP; 17 char username[20]; 18 char buf[CMSGLEN]; 19 struct sockaddr_in client; 20 int stat; 21 int qid; 22 }; 23 24 struct MESSAGE{ 25 long msgtype; 26 struct SERVERMSG msg; 27 }; 28 29 #endif
因爲須要操做信號量,因此將一些信號量的操做作成函數
semaphore.h
1 #ifndef _semaphore 2 #define _semaphore 3 4 union semun 5 { 6 int val; 7 struct semid_ds *buf; 8 unsigned short *array; 9 }; 10 11 int CreateSem(key_t key,int value); 12 int Sem_P(int semid); 13 int Sem_V(int semid); 14 int GetvalueSem(int semid); 15 void DestroySem(int semid); 16 17 18 #endif
對函數的實現:semaphore.c
1 #include <stdlib.h> 2 #include <fcntl.h> 3 #include <sys/sem.h> 4 #include "semaphore.h" 5 6 int CreateSem(key_t key,int value) 7 { 8 union semun sem; 9 int semid; 10 sem.val=value; 11 semid=semget(key,1,IPC_CREAT); 12 if (semid==-1){ 13 perror("semget error"); exit(1); 14 } 15 semctl(semid,0,SETVAL,sem); 16 return semid; 17 } 18 19 int Sem_P(int semid) 20 { 21 struct sembuf sops={0,-1,IPC_NOWAIT}; 22 return (semop(semid,&sops,1)); 23 } 24 25 int Sem_V(int semid) 26 { 27 struct sembuf sops={0,+1,IPC_NOWAIT}; 28 return (semop(semid,&sops,1)); 29 } 30 31 int GetvalueSem(int semid) 32 { 33 union semun sem; 34 return semctl(semid,0,GETVAL,sem); 35 } 36 void DestroySem(int semid) 37 { 38 union semun sem; 39 sem.val=0; 40 41 semctl(semid,0,IPC_RMID,sem); 42 }
接下來是很是重要的服務器端實現(裏面有不少調試信息,比較懶沒有刪掉,直接在裏面註釋掉了。)
server.c
1 #include <stdio.h> 2 #include <string.h> 3 #include <sys/socket.h> 4 #include <netinet/in.h> 5 #include <stdlib.h> 6 #include <sys/types.h> 7 #include <sys/wait.h> 8 #include <sys/stat.h> 9 #include <unistd.h> 10 #include <fcntl.h> 11 #include <sys/ipc.h> 12 #include "semaphore.h" 13 #include "servermsg.h" 14 15 16 void trans_process(int semid); 17 void communicate_process(int connetfd,int qid,struct sockaddr_in client); 18 19 int main(){ 20 21 struct sockaddr_in server; 22 struct sockaddr_in client; 23 int listenfd,connetfd; 24 char ip[20]; 25 int port; 26 int addrlen; 27 struct CLIENTMSG clientMsg; 28 int ret,status; 29 /*---------------------socket-------------------*/ 30 if((listenfd = socket(AF_INET,SOCK_STREAM,0))== -1){ 31 perror("socket() error\n"); 32 exit(1); 33 } 34 35 /*----------------------IO-----------------------*/ 36 printf("Please input the ip:\n"); 37 scanf("%s",ip); 38 printf("Please input the port:\n"); 39 scanf("%d",&port); 40 41 /*---------------------bind----------------------*/ 42 bzero(&server,sizeof(server)); 43 server.sin_family = AF_INET; 44 server.sin_port = htons(port); 45 server.sin_addr.s_addr = inet_addr(ip); 46 if(bind(listenfd,(struct sockaddr *)&server,sizeof(server))== -1){ 47 perror("bind() error\n"); 48 exit(1); 49 } 50 51 /*----------------------listen-------------------*/ 52 if (listen(listenfd,5)== -1){ 53 perror("listen() error\n"); 54 exit(1); 55 } 56 57 //建立命名管道 58 unlink("SERVER"); 59 mkfifo("SERVER",O_CREAT); 60 int rd = open("SERVER",O_RDONLY|O_NONBLOCK); 61 int semid; 62 key_t k = ftok(".",'b'); 63 semid = CreateSem(k,5); 64 pid_t pid_1,pid_2; 65 pid_1 = fork(); 66 if(pid_1 == 0){ 67 trans_process(semid); 68 exit(0); 69 } 70 else if(pid_1 > 0){ 71 while(1){ 72 addrlen = sizeof(client); 73 if((connetfd = accept(listenfd,(struct sockaddr *)&client,&addrlen))== -1){ 74 perror("accept() error\n"); 75 exit(1); 76 } 77 ret = Sem_P(semid); 78 if(ret == 0){ 79 int qid; 80 read(rd,&qid,sizeof(qid)); 81 //printf("qid1:%d\n",qid ); 82 pid_2 = fork(); 83 if (pid_2 > 0){ 84 close(connetfd); 85 waitpid(pid_2,&status,WNOHANG); 86 continue; 87 } 88 else if(pid_2 == 0){ 89 communicate_process(connetfd,qid,client); 90 exit(0); 91 } 92 else { 93 perror("the second fork error\n"); 94 } 95 } 96 else { 97 clientMsg.OP = EXIT; 98 send(connetfd,&clientMsg,sizeof(clientMsg),0); 99 close(connetfd); 100 } 101 waitpid(pid_1,&status,WNOHANG); 102 103 } 104 } 105 else { 106 perror("first time fork error\n"); 107 } 108 /*----------------------close-------------------*/ 109 close(connetfd); 110 close(listenfd); 111 112 return 0; 113 } 114 115 116 /*----------------------------函數實現區----------------------------*/ 117 void trans_process(int semid){ 118 struct SERVERMSG ent[5]; 119 struct MESSAGE sendMsg; 120 struct SERVERMSG msg; 121 int i; 122 for(i=0;i<5;i++){ 123 ent[i].stat = 0; 124 } 125 int wfd = open("SERVER",O_WRONLY|O_NONBLOCK); 126 for(i=0;i<5;i++){ 127 key_t key = ftok(".",(char)i+102); 128 ent[i].qid = msgget(key,IPC_CREAT); 129 write(wfd,&ent[i].qid,sizeof(ent[i].qid)); 130 } 131 unlink("CLIENT"); 132 mkfifo("CLIENT",O_CREAT); 133 int rfd = open("CLIENT",O_RDONLY|O_NONBLOCK); 134 int len; 135 while(1){ 136 bzero(&msg,sizeof(msg)); 137 len = read(rfd,&msg,sizeof(msg)); 138 //printf(" %d,%s ,%s\n",msg.OP,msg.username,msg.buf ); 139 //sleep(3); 140 if(len > 0){ 141 if(msg.OP == USER){ 142 for(i=0;i<5;i++){ 143 if(ent[i].qid == msg.qid){ 144 bcopy(msg.username,ent[i].username,strlen(msg.username)); 145 ent[i].client = msg.client; 146 ent[i].stat = 1; 147 break; 148 } 149 } 150 } 151 else if(msg.OP == EXIT){ 152 for(i=0;i<5;i++){ 153 if(ent[i].qid == msg.qid){ 154 ent[i].stat = 0; 155 write(wfd,&ent[i].qid,sizeof(ent[i].qid)); 156 Sem_V(semid); 157 break; 158 } 159 } 160 } 161 //bzero(&sendMsg,sizeof(sendMsg)); 162 sendMsg.msg = msg; 163 for(i=0;i<5;i++){ 164 if(ent[i].stat == 1){ 165 printf("stat 1...\n"); 166 int m_len = sizeof(msg); 167 int sta=msgsnd(ent[i].qid,&sendMsg,len,0); 168 //printf("flag:%d\n",sta ); 169 } 170 } 171 } 172 else { 173 continue; 174 } 175 } 176 177 } 178 179 void communicate_process(int connetfd,int qid,struct sockaddr_in client){ 180 struct CLIENTMSG sendMsg; 181 struct CLIENTMSG recvMsg; 182 struct MESSAGE server_Msg; 183 struct SERVERMSG client_sndMsg; 184 struct SERVERMSG msg; 185 int status; 186 int wfd = open("CLIENT",O_WRONLY|O_NONBLOCK); 187 pid_t pid; 188 pid = fork(); 189 if(pid < 0){ 190 perror("communicate_process fork error\n"); 191 } 192 else if (pid == 0){ 193 bzero(&sendMsg,sizeof(sendMsg)); 194 sendMsg.OP = OK; 195 send(connetfd,&sendMsg,sizeof(sendMsg),0); 196 while(1){ 197 int m_len = sizeof(msg); 198 bzero(&server_Msg,sizeof(server_Msg)); 199 int sta=msgrcv(qid,&server_Msg,m_len,0,0); 200 //printf("flag:%d\n",sta ); 201 //printf("send..%d,%s,%s\n",server_Msg.msg.OP,server_Msg.msg.username,server_Msg.msg.buf ); 202 bzero(&sendMsg,sizeof(sendMsg)); 203 bcopy(server_Msg.msg.username,sendMsg.username,strlen(server_Msg.msg.username)); 204 sendMsg.OP = server_Msg.msg.OP; 205 bcopy(server_Msg.msg.buf,sendMsg.buf,strlen(server_Msg.msg.buf)); 206 //printf("send..%d,%s,%s\n",sendMsg.OP,sendMsg.username,sendMsg.buf ); 207 send(connetfd,&sendMsg,sizeof(sendMsg),0); 208 } 209 } 210 else{ 211 while(1){ 212 bzero(&recvMsg,sizeof(recvMsg)); 213 int len =recv(connetfd,&recvMsg,sizeof(recvMsg),0); 214 if(len > 0){ 215 if(recvMsg.OP == USER){ 216 printf("user %s login from ip:%s,port:%d\n",recvMsg.username,inet_ntoa(client.sin_addr),ntohs(client.sin_port) ); 217 client_sndMsg.OP = USER; 218 } 219 else if(recvMsg.OP == EXIT){ 220 printf("user %s is logout\n",recvMsg.username ); 221 client_sndMsg.OP = EXIT; 222 write(wfd,&client_sndMsg,sizeof(client_sndMsg)); 223 break; 224 } 225 else if(recvMsg.OP == MSG){ 226 client_sndMsg.OP = MSG; 227 } 228 bzero(&client_sndMsg,sizeof(client_sndMsg)); 229 bcopy(recvMsg.username,client_sndMsg.username,strlen(recvMsg.username)); 230 bcopy(recvMsg.buf,client_sndMsg.buf,strlen(recvMsg.buf)); 231 client_sndMsg.client = client; 232 //printf("qid2:%d\n",qid ); 233 client_sndMsg.qid = qid; 234 client_sndMsg.OP = recvMsg.OP; 235 write(wfd,&client_sndMsg,sizeof(client_sndMsg)); 236 237 } 238 else{ 239 continue; 240 } 241 } 242 kill(pid,SIGKILL); 243 waitpid(pid,&status,WNOHANG); 244 close(wfd); 245 close(connetfd); 246 } 247 }
寫出了服務端,就能夠很是容易的寫出客戶端了。
client.c
1 #include <stdio.h> 2 #include <string.h> 3 #include <sys/socket.h> 4 #include <netinet/in.h> 5 #include <stdlib.h> 6 #include <sys/types.h> 7 #include <sys/wait.h> 8 #include <signal.h> 9 #include <unistd.h> 10 #include "clientmsg.h" 11 12 int main(){ 13 int sockfd; 14 char ip[20]; 15 int port; 16 int status; 17 pid_t pid; 18 struct sockaddr_in server; 19 struct CLIENTMSG clientMsg; 20 21 /*---------------------socket---------------------*/ 22 if((sockfd = socket(AF_INET,SOCK_STREAM,0))== -1){ 23 perror("socket error\n"); 24 exit(1); 25 } 26 27 /*---------------------connect--------------------*/ 28 printf("Please input the ip:\n"); 29 scanf("%s",ip); 30 printf("Please input the port:\n"); 31 scanf("%d",&port); 32 bzero(&server,sizeof(server)); 33 server.sin_family = AF_INET; 34 server.sin_port = htons(port); 35 inet_aton(ip,&server.sin_addr); 36 if(connect(sockfd,(struct sockaddr *)&server,sizeof(server))== -1){ 37 perror("connect() error\n"); 38 exit(1); 39 } 40 recv(sockfd,&clientMsg,sizeof(clientMsg),0); 41 if(clientMsg.OP == OK){ 42 int len; 43 pid = fork(); 44 if(pid == 0){ 45 while(1){ 46 bzero(&clientMsg,sizeof(clientMsg)); 47 len =recv(sockfd,&clientMsg,sizeof(clientMsg),0); 48 if(len > 0){ 49 if(clientMsg.OP ==USER){ 50 printf("the user %s is login.\n",clientMsg.username ); 51 } 52 else if(clientMsg.OP == EXIT){ 53 printf("the user %s is logout.\n",clientMsg.username); 54 } 55 else if(clientMsg.OP == MSG){ 56 printf("%s: %s\n",clientMsg.username,clientMsg.buf ); 57 } 58 } 59 } 60 exit(EXIT_SUCCESS); 61 } 62 else if(pid > 0){ 63 printf("Please input the username:\n"); 64 scanf("%s",clientMsg.username); 65 clientMsg.OP = USER; 66 send(sockfd,&clientMsg,sizeof(clientMsg),0); 67 while(1){ 68 clientMsg.OP = MSG; 69 scanf("%s",clientMsg.buf); 70 if(strcmp("bye",clientMsg.buf) == 0){ 71 clientMsg.OP = EXIT; 72 send(sockfd,&clientMsg,sizeof(clientMsg),0); 73 break; 74 } 75 send(sockfd,&clientMsg,sizeof(clientMsg),0); 76 77 } 78 kill(pid,SIGKILL); 79 waitpid(pid,&status,WNOHANG); 80 } 81 else{ 82 perror("fork error!\n"); 83 } 84 } 85 else{ 86 printf("以達到最大鏈接數!\n"); 87 } 88 /*------------------------close--------------------------*/ 89 close(sockfd); 90 91 return 0; 92 }
最後是makefile:
main:server.o client.o semaphore.o gcc server.o semaphore.o -oserver gcc client.o -oclient server.o:server.c semaphore.h clientmsg.h servermsg.h gcc -c server.c client.o:client.c clientmsg.h gcc -c client.c semaphore.o:semaphore.h semaphore.c gcc -c semaphore.c clean: rm -rf *.o
下面上一下演示過程:(測試環境,Red Hat Enterprise Linux 6 + centos系Linux,ubuntu下可能會有些問題。)
首先先把服務端啓動開來,爲了方便測試,這裏直接使用的是127.0.0.1的localhost。
而後啓動兩個客戶端用來測試,在用戶登陸的時候客戶端會有消息提醒。服務端會有日誌打印輸出客戶端的名字和登陸ip、端口。
客戶能夠發送消息了,如圖發送與接收均正常。能夠同時啓動<=5個客戶端進行羣聊,這裏爲了簡單演示只是啓動了2個。(修改信號量代碼能夠實現n多個客戶的同時登錄):
輸入bye之後便可退出聊天並下線。當有客戶下線的時候,在線的客戶端會收到下線提醒,客戶端會有日誌打印輸出。
這個實驗內容前先後後花了我2天才寫完,剛開始沒有弄清楚這一整套的工做機制與流程,寫起來非常吃力,程序就是各類調不通。原本都想放棄了,可是後來仍是咬咬牙堅持了一下來,飯要一口一口吃,程序要一點一點的寫,萬事不能操之過急,寫代碼必定要心平氣和,頭腦清晰。因爲gdb調試工具用的不是很熟練,只能在程序裏面一段一段的print變量來DEBUG,非常辛苦啊。