epoll實現多人聊天室

utility.h頭文件ios

#ifndef UTILITY_H_INCLUDED
#define UTILITY_H_INCLUDED
#include <iostream>
#include <list>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
using namespace std;
// clients_list save all the clients's socket
list<int> clients_list;//存儲服務端的在線客戶
/**********************   macro defintion **************************/
// server ip
#define SERVER_IP "127.0.0.1"//使用本機環路測試地址做爲鏈接socket綁定的ip地址
// server port
#define SERVER_PORT 8888//使用8888做爲鏈接socket綁定的端口
//epoll size
#define EPOLL_SIZE 5000//epoll事件表最大事件數量
//message buffer size
#define BUF_SIZE 0xFFFF//讀寫緩衝最大值
#define SERVER_WELCOME "Welcome you join  to the chat room! Your chat ID is: Client #%d"
#define SERVER_MESSAGE "ClientID %d say >> %s"
// exit
#define EXIT "EXIT"
#define CAUTION "There is only one int the char room!"
/**********************   some function **************************/
/**
  * @param sockfd: socket descriptor
  * @return 0
**/
int setnonblocking(int sockfd)//將文件描述符設爲非阻塞
{
    fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)| O_NONBLOCK);
    return 0;
}
/**
  * @param epollfd: epoll handle
  * @param fd: socket descriptor
  * @param enable_et : enable_et = true, epoll use ET; otherwise LT
**/
void addfd( int epollfd, int fd, bool enable_et )//將感興趣的fd掛到epollfd指向的事件表
{
    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLIN;
    if( enable_et )
        ev.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
    setnonblocking(fd);
    printf("fd added to epoll!\n\n");
}
/**
  * @param clientfd: socket descriptor
  * @return : len
**/
int sendBroadcastmessage(int clientfd)//廣播消息
{
    // buf[BUF_SIZE] receive new chat message
    // message[BUF_SIZE] save format message
    char buf[BUF_SIZE], message[BUF_SIZE];
    bzero(buf, BUF_SIZE);
    bzero(message, BUF_SIZE);
    // receive message
    printf("read from client(clientID = %d)\n", clientfd);
    int len = recv(clientfd, buf, BUF_SIZE, 0);//接收clientfd號聊天室的數據
    if(len == 0)  // len = 0 means the client closed connection
    {
        close(clientfd);
        clients_list.remove(clientfd); //server remove the client
        printf("ClientID = %d closed.\n now there are %d client in the char room\n", clientfd, (int)clients_list.size());
    }
    else  //broadcast message
    {
        if(clients_list.size() == 1) { // this means There is only one int the char room
            send(clientfd, CAUTION, strlen(CAUTION), 0);
            return len;
        }
        // format message to broadcast
        sprintf(message, SERVER_MESSAGE, clientfd, buf);//SERVER_MESSAGE="ClientID %d say >> %s",以此格式把數據發送給全部聊天室
        list<int>::iterator it;
        for(it = clients_list.begin(); it != clients_list.end(); ++it) {//遍歷全部聊天室,向它們發送數據,除了提供數據的的聊天室
           if(*it != clientfd){
                if( send(*it, message, BUF_SIZE, 0) < 0 ) { perror("error"); exit(-1);}
           }
        }
    }
    return len;
}
#endif

 客戶端.cpp編程

客戶端要處理的事件有2種,1.來自服務器的數據或者斷開鏈接 .2.處理用戶輸入的數據。因此爲了高效使用併發編程,父進程處理服務器相關的操做,子進程處理用戶輸入服務器

 代碼中isClientwork狀態用來標記該聊天室是否工做,當使用了fork後,父子進程共享數據,可是數據會寫時更新,即不管父子進程修改這個變量都不會影響對方,由於修改這個值後會爲子進程分配新的數據空間併發

EPOLLIN:表示對應的文件描述符能夠讀(包括對端SOCKET正常關閉);socket

EPOLLHUP:掛起。好比管道的寫端被關閉後,讀端描述符上接收到POLLHUOP事件測試

再來看一個問題,代碼中父進程結束有2種方式,一是服務器關閉鏈接,此時socke即connect有事件發生,對端關閉。二是用戶輸入EXIT,子進程結束,寫管道關閉,由於讀管道文件描述符被掛在epoll註冊的事件表中,它的對端關閉了,因此有事件發生。二者this

讀取數據都爲0,斷定爲對端正常關閉。spa

若是輸入了EXIT,子進程關閉,而後父進程關閉。code

若是服務器關閉,那麼父進程關閉,可是子進程並無退出,它變成了孤兒進程,它的父進程爲init(1號進程),當你向終端寫數據時,由於父進程關閉了讀管道,因此寫入失敗,exit(-1),子進程退出orm

#include "utility.h"

int main(int argc, char *argv[])
{
    //用戶鏈接的服務器 IP + port
    struct sockaddr_in serverAddr;
    serverAddr.sin_family = PF_INET;
    serverAddr.sin_port = htons(SERVER_PORT);
    serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP);

    // 建立socket
    int sock = socket(PF_INET, SOCK_STREAM, 0);
    if(sock < 0) { perror("sock error"); exit(-1); }
    // 鏈接服務端
    if(connect(sock, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
        perror("connect error");
        exit(-1);
    }

    // 建立管道,其中fd[0]用於父進程讀,fd[1]用於子進程寫
    int pipe_fd[2];
    if(pipe(pipe_fd) < 0) { perror("pipe error"); exit(-1); }

    // 建立epoll
    int epfd = epoll_create(EPOLL_SIZE);
    if(epfd < 0) { perror("epfd error"); exit(-1); }
    static struct epoll_event events[2]; 
    //將sock和管道讀端描述符都添加到內核事件表中
    addfd(epfd, sock, true);
    addfd(epfd, pipe_fd[0], true);
    // 表示客戶端是否正常工做
    bool isClientwork = true;

    // 聊天信息緩衝區
    char message[BUF_SIZE];

    // Fork
    int pid = fork();
    if(pid < 0) { perror("fork error"); exit(-1); }
    else if(pid == 0)      // 子進程
    {
        //子進程負責寫入管道,所以先關閉讀端
        close(pipe_fd[0]); 
        printf("Please input 'exit' to exit the chat room\n");

        while(isClientwork){
            bzero(&message, BUF_SIZE);
            fgets(message, BUF_SIZE, stdin);

            // 客戶輸出exit,退出
            if(strncasecmp(message, EXIT, strlen(EXIT)) == 0){
                isClientwork = 0;
            }
            // 子進程將信息寫入管道
            else {
                if( write(pipe_fd[1], message, strlen(message) - 1 ) < 0 )
                 { perror("fork error"); exit(-1); }
            }
        }
    }
    else  //pid > 0 父進程
    {
        //父進程負責讀管道數據,所以先關閉寫端
        close(pipe_fd[1]); 

        // 主循環(epoll_wait)
        while(isClientwork) {
            int epoll_events_count = epoll_wait( epfd, events, 2, -1 );
            //處理就緒事件
            for(int i = 0; i < epoll_events_count ; ++i)
            {
                bzero(&message, BUF_SIZE);

                //服務端發來消息
                if(events[i].data.fd == sock)
                {
                    //接受服務端消息
                    int ret = recv(sock, message, BUF_SIZE, 0);

                    // ret= 0 服務端關閉
                    if(ret == 0) {
                        printf("Server closed connection: %d\n", sock);
                        close(sock);
                        isClientwork = 0;
                    }
                    else printf("%s\n", message);

                }
                //子進程寫入事件發生,父進程處理併發送服務端
                else { 
                    //父進程從管道中讀取數據
                    int ret = read(events[i].data.fd, message, BUF_SIZE);

                    // ret = 0
                    if(ret == 0) isClientwork = 0;
                    else{   // 將信息發送給服務端
                      send(sock, message, BUF_SIZE, 0);
                    }
                }
            }//for
        }//while
    }

    if(pid){
       //關閉父進程和sock
        close(pipe_fd[0]);
        close(sock);
    }else{
        //關閉子進程
        close(pipe_fd[1]);
    }
    return 0;
}

 服務器.cpp

服務器只須要向內核註冊兩個感興趣的事件。1.有新客戶鏈接 2.某個客戶有數據發送.

有新客戶到來,就把該用戶添加到list中
有客戶發送數據就把這數據轉發到其餘客戶端

#include "utility.h"

int main(int argc, char *argv[])
{
    //服務器IP + port
    struct sockaddr_in serverAddr;
    serverAddr.sin_family = PF_INET;
    serverAddr.sin_port = htons(SERVER_PORT);
    serverAddr.sin_addr.s_addr = inet_addr(SERVER_IP);
    //建立監聽socket
    int listener = socket(PF_INET, SOCK_STREAM, 0);
    if(listener < 0) { perror("listener"); exit(-1);}
    printf("listen socket created \n");
    //綁定地址
    if( bind(listener, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) < 0) {
        perror("bind error");
        exit(-1);
    }
    //監聽
    int ret = listen(listener, 5);
    if(ret < 0) { perror("listen error"); exit(-1);}
    printf("Start to listen: %s\n", SERVER_IP);
    //在內核中建立事件表
    int epfd = epoll_create(EPOLL_SIZE);
    if(epfd < 0) { perror("epfd error"); exit(-1);}
    printf("epoll created, epollfd = %d\n", epfd);
    static struct epoll_event events[EPOLL_SIZE];
    //往內核事件表裏添加事件
    addfd(epfd, listener, true);
    //主循環
    while(1)
    {
        //epoll_events_count表示就緒事件的數目
        int epoll_events_count = epoll_wait(epfd, events, EPOLL_SIZE, -1);
        if(epoll_events_count < 0) {
            perror("epoll failure");
            break;
        }

        printf("epoll_events_count = %d\n", epoll_events_count);
        //處理這epoll_events_count個就緒事件
        for(int i = 0; i < epoll_events_count; ++i)
        {
            int sockfd = events[i].data.fd;
            //新用戶鏈接
            if(sockfd == listener)
            {
                struct sockaddr_in client_address;
                socklen_t client_addrLength = sizeof(struct sockaddr_in);
                int clientfd = accept( listener, ( struct sockaddr* )&client_address, &client_addrLength );

                printf("client connection from: %s : % d(IP : port), clientfd = %d \n",
                inet_ntoa(client_address.sin_addr),
                ntohs(client_address.sin_port),
                clientfd);

                addfd(epfd, clientfd, true);

                // 服務端用list保存用戶鏈接
                clients_list.push_back(clientfd);
                printf("Add new clientfd = %d to epoll\n", clientfd);
                printf("Now there are %d clients int the chat room\n", (int)clients_list.size());

                // 服務端發送歡迎信息
                printf("welcome message\n");
                char message[BUF_SIZE];
                bzero(message, BUF_SIZE);
                sprintf(message, SERVER_WELCOME, clientfd);
                int ret = send(clientfd, message, BUF_SIZE, 0);
                if(ret < 0) { perror("send error"); exit(-1); }
            }
            //處理用戶發來的消息,並廣播,使其餘用戶收到信息
            else
            {
                int ret = sendBroadcastmessage(sockfd);
                if(ret < 0) { perror("error");exit(-1); }
            }
        }
    }
    close(listener); //關閉socket
    close(epfd);    //關閉內核
    return 0;
}
相關文章
相關標籤/搜索