版權聲明:本文爲博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處連接和本聲明。
本文連接:https://blog.csdn.net/qq_25425023/article/details/70199133
首先是一個locker.h的文件,封裝了信號量、互斥量、條件變量。ios
在線程池中的任務隊列須要互斥量的保護,當任務隊列中有任務到達時,須要喚醒一個等待pthread_cond_wait()的線程,線程池中止時,須要喚醒因此的線程,調用的是pthread_cond_broadcast()。數組
locker.h文件:併發
#ifndef _LOCKER_H_
#define _LOCKER_H_
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
/*信號量的類*/
class sem_locker
{
private:
sem_t m_sem;
public:
//初始化信號量
sem_locker()
{
if(sem_init(&m_sem, 0, 0) != 0)
printf("sem init error\n");
}
//銷燬信號量
~sem_locker()
{
sem_destroy(&m_sem);
}
//等待信號量
bool wait()
{
return sem_wait(&m_sem) == 0;
}
//添加信號量
bool add()
{
return sem_post(&m_sem) == 0;
}
};
/*互斥 locker*/
class mutex_locker
{
private:
pthread_mutex_t m_mutex;
public:
mutex_locker()
{
if(pthread_mutex_init(&m_mutex, NULL) != 0)
printf("mutex init error!");
}
~mutex_locker()
{
pthread_mutex_destroy(&m_mutex);
}
bool mutex_lock() //lock mutex
{
return pthread_mutex_lock(&m_mutex) == 0;
}
bool mutex_unlock() //unlock
{
return pthread_mutex_unlock(&m_mutex) == 0;
}
};
/*條件變量 locker*/
class cond_locker
{
private:
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
public:
// 初始化 m_mutex and m_cond
cond_locker()
{
if(pthread_mutex_init(&m_mutex, NULL) != 0)
printf("mutex init error");
if(pthread_cond_init(&m_cond, NULL) != 0)
{ //條件變量初始化是被,釋放初始化成功的mutex
pthread_mutex_destroy(&m_mutex);
printf("cond init error");
}
}
// destroy mutex and cond
~cond_locker()
{
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
//等待條件變量
bool wait()
{
int ans = 0;
pthread_mutex_lock(&m_mutex);
ans = pthread_cond_wait(&m_cond, &m_mutex);
pthread_mutex_unlock(&m_mutex);
return ans == 0;
}
//喚醒等待條件變量的線程
bool signal()
{
return pthread_cond_signal(&m_cond) == 0;
}
//喚醒all等待條件變量的線程
bool broadcast()
{
return pthread_cond_broadcast(&m_cond) == 0;
}
};
#endifapp
thread_pool.h文件。socket
建立threadnum個線程,並調用pthread_detach()分離線程,線程結束,自動回收資源。(前面的一篇博客的線程池有bug,不完整,線程池退出時,不能讓全部的線程正常退出)函數
#ifndef _PTHREAD_POOL_
#define _PTHREAD_POOL_
#include "locker.h"
#include <queue>
#include <stdio.h>
#include <exception>
#include <errno.h>
#include <pthread.h>
#include <iostream>
template<class T>
class threadpool
{
private:
int thread_number; //線程池的線程數
//int max_task_number; //任務隊列中的最大任務數
pthread_t *all_threads; //線程數組
std::queue<T *> task_queue; //任務隊列
mutex_locker queue_mutex_locker; //互斥鎖
//sem_locker queue_sem_locker; //信號量
cond_locker queue_cond_locker; //cond
bool is_stop; //是否結束線程
public:
threadpool(int thread_num = 20);
~threadpool();
bool append_task(T *task); //添加任務
void start(); //線程池開啓
void stop(); //線程池關閉
private:
//線程運行的函數。執行run()函數
static void *worker(void *arg);
void run();
T *getTask(); //獲取任務
};
template <class T>
threadpool<T>::threadpool(int thread_num):
thread_number(thread_num),is_stop(false), all_threads(NULL)
{ //構造函數
if(thread_num <= 0)
printf("threadpool can't init because thread_number = 0");
all_threads = new pthread_t[thread_number];
if(all_threads == NULL)
printf("can't init threadpool because thread array can't new");
}
template <class T>
threadpool<T>::~threadpool()
{
delete []all_threads;
stop();
}
template <class T>
void threadpool<T>::stop() //線程池中止
{
is_stop = true;
//queue_sem_locker.add();
queue_cond_locker.broadcast();
}
template <class T>
void threadpool<T>::start() //線程池啓動
{
for(int i = 0; i < thread_number; ++i)
{
//printf("create the %dth pthread\n", i);
if(pthread_create(all_threads + i, NULL, worker, this) != 0)
{//建立線程失敗,清除成功申請的資源並拋出異常
delete []all_threads;
throw std::exception();
}
if(pthread_detach(all_threads[i]))
{//將線程設置爲脫離線程,失敗則清除成功申請的資源並拋出異常
delete []all_threads;
throw std::exception();
}
}
}
//添加任務進入任務隊列
template <class T>
bool threadpool<T>::append_task(T *task) //添加任務
{ //獲取互斥鎖
queue_mutex_locker.mutex_lock();
bool is_signal = task_queue.empty();
//添加進入隊列
task_queue.push(task);
queue_mutex_locker.mutex_unlock();
//喚醒等待任務的線程
if(is_signal)
{
queue_cond_locker.signal();
}
return true;
}
template <class T>
void *threadpool<T>::worker(void *arg) //線程工做函數
{
threadpool *pool = (threadpool *)arg;
pool->run();
return pool;
}
template <class T>
T* threadpool<T>::getTask() //從任務隊列中獲取任務
{
T *task = NULL;
queue_mutex_locker.mutex_lock();
if(!task_queue.empty())
{
task = task_queue.front();
task_queue.pop();
}
queue_mutex_locker.mutex_unlock();
return task;
}
template <class T>
void threadpool<T>::run()
{
while(!is_stop){
T *task = getTask();
if(task == NULL) //隊列爲空,等待
queue_cond_locker.wait();
else //執行任務
task->doit();
}
//for test
//printf("exit%d\n", (unsigned long)pthread_self());
}
#endif
post
封裝了epoll。
EpollServer.h中的BaseTask.h和Task.h應該放在另一個文件中的。這裏圖個方便,哈哈。測試
#ifndef _EPOLL_SERVER_H_
#define _EPOLL_SERVER_H_
#include <sys/socket.h>
#include <sys/types.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/epoll.h>
//#include <pthread.h>
#include "thread_pool.h"
#define MAX_EVENT 1024 //epoll_events的最大個數
#define MAX_BUFFER 2048 //Buffer的最大字節
class BaseTask
{
public:
virtual void doit() = 0;
};
class Task : public BaseTask
{
private:
int sockfd;
char order[MAX_BUFFER];
public:
Task(char *str, int fd) : sockfd(fd)
{
memset(order, '\0', MAX_BUFFER);
strcpy(order, str);
}
void doit() //任務的執行函數
{
//do something of the order
//printf("%s\n", order);
snprintf(order, MAX_BUFFER - 1, "somedata\n");
write(sockfd, order, strlen(order));
}
};
class EpollServer
{
private:
bool is_stop; //是否中止epoll_wait的標誌
int threadnum; //線程數目
int sockfd; //監聽的fd
int port; //端口
int epollfd; //Epoll的fd
threadpool<BaseTask> *pool; //線程池的指針
//char address[20];
epoll_event events[MAX_EVENT]; //epoll的events數組
struct sockaddr_in bindAddr; //綁定的sockaddr
public://構造函數
EpollServer()
{}
EpollServer(int ports, int thread) : is_stop(false) , threadnum(thread) ,
port(ports), pool(NULL)
{
}
~EpollServer() //析構
{
delete pool;
}
void init();
void epoll();
static int setnonblocking(int fd) //將fd設置稱非阻塞
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
static void addfd(int epollfd, int sockfd, bool oneshot) //向Epoll中添加fd
{//oneshot表示是否設置稱同一時刻,只能有一個線程訪問fd,數據的讀取都在主線程中,因此調用都設置成false
epoll_event event;
event.data.fd = sockfd;
event.events = EPOLLIN | EPOLLET;
if(oneshot)
{
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &event); //添加fd
EpollServer::setnonblocking(sockfd);
}
};
void EpollServer::init() //EpollServer的初始化
{
bzero(&bindAddr, sizeof(bindAddr));
bindAddr.sin_family = AF_INET;
bindAddr.sin_port = htons(port);
bindAddr.sin_addr.s_addr = htonl(INADDR_ANY);
//建立Socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd < 0)
{
printf("EpollServer socket init error\n");
return;
}
int ret = bind(sockfd, (struct sockaddr *)&bindAddr, sizeof(bindAddr));
if(ret < 0)
{
printf("EpollServer bind init error\n");
return;
}
ret = listen(sockfd, 10);
if(ret < 0)
{
printf("EpollServer listen init error\n");
return;
}
//create Epoll
epollfd = epoll_create(1024);
if(epollfd < 0)
{
printf("EpollServer epoll_create init error\n");
return;
}
pool = new threadpool<BaseTask>(threadnum); //建立線程池
}
void EpollServer::epoll()
{
pool->start(); //線程池啓動
//
addfd(epollfd, sockfd, false);
while(!is_stop)
{//調用epoll_wait
int ret = epoll_wait(epollfd, events, MAX_EVENT, -1);
if(ret < 0) //出錯處理
{
printf("epoll_wait error\n");
break;
}
for(int i = 0; i < ret; ++i)
{
int fd = events[i].data.fd;
if(fd == sockfd) //新的鏈接到來
{
struct sockaddr_in clientAddr;
socklen_t len = sizeof(clientAddr);
int confd = accept(sockfd, (struct sockaddr *)
&clientAddr, &len);
EpollServer::addfd(epollfd, confd, false);
}
else if(events[i].events & EPOLLIN) //某個fd上有數據可讀
{
char buffer[MAX_BUFFER];
readagain: memset(buffer, 0, sizeof(buffer));
int ret = read(fd, buffer, MAX_BUFFER - 1);
if(ret == 0) //某個fd關閉了鏈接,從Epoll中刪除並關閉fd
{
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev);
shutdown(fd, SHUT_RDWR);
printf("%d logout\n", fd);
continue;
}
else if(ret < 0)//讀取出錯,嘗試再次讀取
{
if(errno == EAGAIN)
{
printf("read error! read again\n");
goto readagain;
break;
}
}
else//成功讀取,向線程池中添加任務
{
BaseTask *task = new Task(buffer, fd);
pool->append_task(task);
}
}
else
{
printf("something else had happened\n");
}
}
}
close(sockfd);//結束。
pool->stop();
}
#endifthis
接下來是簡單的Demo的測試。.net
#include "EpollServer.h"
int main(int argc, char const *argv[])
{
if(argc != 3)
{
printf("usage %s port threadnum\n", argv[0]);
return -1;
}
int port = atoi(argv[1]);
if(port == 0)
{
printf("port must be Integer\n");
return -1;
}
int threadnum = atoi(argv[2]);
if(port == 0)
{
printf("threadnum must be Integer\n");
return -1;
}
EpollServer *epoll = new EpollServer(port, threadnum);
epoll->init();
epoll->epoll();
return 0;
}
代碼在Ubuntu中編譯經過。下次再來更新可以支持併發量的多少。
-------------------------------------------------------------------------------------------------
———————————————— 版權聲明:本文爲CSDN博主「XD灬」的原創文章,遵循CC 4.0 by-sa版權協議,轉載請附上原文出處連接及本聲明。 原文連接:https://blog.csdn.net/qq_25425023/article/details/70199133