linux網絡編程學習筆記之五 -----併發機制與線程�

進程線程分配方式

簡述下常見的進程和線程分配方式:(好吧,我僅僅是舉幾個樣例做爲筆記。。。併發的水太深了,不敢妄談。。。)java

一、進程線程預分配node

簡言之,當I/O開銷大於計算開銷且併發量較大時,爲了節省每次都要建立和銷燬進程和線程的開銷。可以在請求到達前預先進行分配。多線程

二、進程線程延遲分配併發

預分配節省了處理時的負擔,但操做系統管理這些進程線程也會帶來必定的開銷。由此,有個折中的方法是,當某個處理需要花費較長時間的時候,咱們建立一個併發的進程或線程來處理該請求。實現也很是easy,在主線程中定時,定時到期,開新分支。socket

三、前面二者結合tcp

仍是舉個樣例,比方可以:在啓動時不進行預分配,某個處理太長,則建立從進程,任務結束後不退出。函數


多進程與多線程比較

可以參考這篇論文:Linux下多進程和多線程性能分析  和這篇Blog:多進程or多線程  總結起來,在任務運行效率上,在任務量較大(文中單次5k以上),多進程的效率高點,反之,多線程站優點,但整體上出入不大。而在建立和銷燬的效率上,線程的優點明顯,約爲5~6倍。而後在server上,併發量不大(小於幾千),預先建立線程也沒太大優點,因爲動態管理線程的開銷亦不可忽略。性能


線程池

比較全面和概要的介紹可以參看:線程池的介紹及簡單實現操作系統

基本思路是,預先建立必定數量的線程,讓它們處於堵塞狀態,佔用很是小的內存空間。當任務到來時,選擇一個空暇的線程運行,完畢後線程不退出,繼續堵塞。池子的建立銷燬和管理有一個線程單獨完畢。.net

進一步地,動態地對線程的數量進行管理,負載較大時,添加�線程數量。負載小時,下降之,設法讓一段時間不活躍的線程退出,比方讓線程在等待下一個請求前先啓動一個定時器,若請求到達前定時器到期,則線程退出。


對於處理時間短,處理數目巨大的狀況,線程池有自然優點。尤爲是對性能要求高的應用或者突發性大規模請求,比方電商秒殺神馬的。

線程池的實現可以參考libthreadpool,一個開源的庫,sourceforge上能找到源代碼

我用簡單的模型實現了一個,功能上基本知足,主從線程的流程例如如下圖所看到的,喚醒空暇線程的時候不加以區分,即steven說的驚羣,這會必定程度上的損失性能。與之相應的是有主線程採取必定的方式對空暇線程的喚醒進行調度以均衡負載和工做量。如下的代碼是個人1.0版(改得太亂了,看官們勿怪),更進一步的功能,如動態地改變池子的尺寸,興許繼續無缺


pthread_pool.h

#ifndef _PTHREAD_POOL_H_
#define _PTHREAD_POOL_H_

#include<stdio.h>
#include<unistd.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>

#define MAX_PTHREAD_NUM 100

typedef struct task_node{
	void * (*func)(void * p);
	void * arg;
	struct task_node* next;
}task_node;

typedef struct p_pool{
	pthread_cond_t cond;
	pthread_mutex_t mutex;
	task_node *head, *tail;
	pthread_t *p_tid;			//mark , unsigned long
	int max_num;
	int current_num;
	int working_num;
	int if_destory;
	int decrease;
}p_pool;

p_pool * pool = NULL;

void pool_init(int pthread_num);
void *pthread_process(void *arg);
int add_task(void * (*func)(void *p), void *arg);
void pool_destory();

#endif

pthread_pool.c

/* 
 *   a simple thread pool
 *				Mon Jun 9 21:44:36 CST 2014
 *              by  Simon Xia
 */
#include"pthread_pool.h"

/* each worker thread's thread function to handle the task */
void *pthread_process(void *arg)
{
	task_node *tmp = NULL;

//	printf("Now in the %lu thread\n", pthread_self());
	while (1) {
		pthread_mutex_lock(&pool->mutex);
//		printf("%lu thread get lock\n", pthread_self());

		while (!pool->head && !pool->if_destory/* && !pool->decrease*/) { //While !  用是否有任務來控制
//			printf("%lu thread will wait\n", pthread_self());
			pthread_cond_wait(&pool->cond, &pool->mutex);
		}

//		printf("%lu thread: signal is coming\n", pthread_self());
		if (pool->if_destory /*|| pool->decrease*/)
			break;

		tmp = pool->head;

		pool->head = pool->head->next;
	//		pool->working_num++;

		pthread_mutex_unlock(&pool->mutex);
//		printf("%lu thread pick task from queue\n", pthread_self());
		(tmp->func)(tmp->arg);
//		printf("%lu thread finish task\n", pthread_self());
		free(tmp);
		tmp = NULL; //mark

		/*
			pthread_mutex_lock(&pool->mutex);
			pool->working_num--;
			pthread_mutex_unlock(&pool->mutex);
			*/

	}
	pthread_mutex_unlock(&pool->mutex);//先解鎖!!
	printf("%lu thread will exit\n", pthread_self());
	pthread_exit(NULL);
}

/* main thread function to manage the thread pool */
/*
void *pthread_main(void *arg)
{
	printf("This is main thread\n");
	int i;
	while (1)
	{
		usleep(50000);
		pthread_mutex_lock(&pool->mutex);
		if (pool->if_destory)
			break;
		if (pool->working_num == pool->current_num) {
			for (i = pool->current_num; i < 2 * pool->current_num; i++)
				pthread_create(&pool->p_tid[i], NULL, pthread_process, NULL);
			pool->current_num *= 2;
			printf("The number of thread has been enlarged to %d\n", pool->current_num);
		}
		else if (pool->working_num <= pool->current_num / 4){
			pool->decrease = 1;
			pthread_mutex_unlock(&pool->mutex);
			for (i = 0; i < pool->current_num / 2; i++)
				pthread_cond_signal(&pool->cond);
			pool->current_num /= 2;
			pool->decrease = 0;
			printf("The number of thread has been decrease to %d\n", pool->current_num);
		}
		pthread_mutex_unlock(&pool->mutex);
	}
	pthread_exit(NULL);
}
*/

/* Initialize the thread pool 
 * Input: number of worker thread
 */
void pool_init(int pthread_num)
{
	int i = 0;

	pool = (p_pool*)malloc(sizeof(p_pool));
	pthread_mutex_init(&pool->mutex, NULL);
	pthread_cond_init(&pool->cond, NULL);
	pool->head = pool->tail = NULL;
	pool->max_num = MAX_PTHREAD_NUM;
	pool->current_num = pthread_num;
	pool->working_num = 0;
	pool->if_destory = 0;
	pool->decrease = 0;
	pool->p_tid = (pthread_t*)malloc(pthread_num * sizeof(pthread_t));
	
//	pthread_create(&pool->p_tid[i], NULL, pthread_main, NULL);

	for (i = 0; i < pthread_num; i++)
		pthread_create(&pool->p_tid[i], NULL, pthread_process, NULL);

}

/* add task into task queue */
int add_task(void * (*func)(void *p), void *arg)
{
	task_node *tmp = (task_node*)malloc(sizeof(task_node));
	tmp->func = *func; //Mark
	tmp->arg = arg;
	tmp->next = NULL;

	pthread_mutex_lock(&pool->mutex);
	if (pool->head) {
		pool->tail = pool->tail->next = tmp;
	}
	else {
		pool->tail = pool->head = tmp;
	}

	pthread_mutex_unlock(&pool->mutex);
	//不加不行?
	//printf("Add task %d success!\n",*(int*)tmp->arg);
	//sleep(1);
	pthread_cond_signal(&pool->cond);
	
	tmp = NULL; //can't free
	return 0;
}

/* destory the pool after all work */
void pool_destory()
{
	int i;

//	pthread_mutex_lock(&pool->mutex);
	pool->if_destory = 1;
//	pthread_mutex_unlock(&pool->mutex);

	pthread_cond_broadcast(&pool->cond);

	for (i = 0; i < pool->current_num; i++)
	{
		if (!pthread_join(pool->p_tid[i], NULL))
			printf("Success to collect thread %lu\n", pool->p_tid[i]);
		else
			printf("Fail to collect thread %lu\n", pool->p_tid[i]);
	}

	free(pool->p_tid);
	free(pool->head);
	free(pool->tail);

	pthread_cond_destroy(&pool->cond);
	pthread_mutex_destroy(&pool->mutex);

	free(pool);
	pool = NULL;
}

基於這個線程池的服務端程序:

#include"simon_socket.h"

#define SERV_PORT 12345
#define THREAD_CNT 10

extern void pool_init(int );
extern int add_task(void* (*) (void*), void*);
extern void pool_destory();

typedef struct client_info{
	int fd;
	struct sockaddr_in addr;
	struct client_info *next;
}client_info;

void *process(void *arg)
{
	process_client(((client_info*)arg)->fd, &((client_info*)arg)->addr);
	return NULL;
}

void sig_int(int signo)
{
	pool_destory();
	exit(0);
}

int main()
{
	int sockfd, acfd;
	size_t sin_len;
	struct sockaddr_in client_addr;
	client_info *info_tmp, *info_head = NULL, *info_tail = NULL;

	signal(SIGINT, sig_int);

	sin_len = sizeof(struct sockaddr);
	sockfd = init_tcp_psock(SERV_PORT);
	pool_init(THREAD_CNT);


	for ( ; ; )
	{
		if ((acfd = accept(sockfd, (struct sockaddr *)&client_addr, &sin_len)) == -1)
		{
			perror("Accept request failed: ");
			return 1;
		}
		else
			printf("Get a connection from %s:%d !\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));

		info_tmp = (client_info *)malloc(sizeof(client_info));
		memset(info_tmp, 0, sizeof(client_info));
		info_tmp->fd = acfd;
		info_tmp->addr = client_addr;
		info_tmp->next = NULL;
		
		if (info_head) {
			info_tail = info_tail -> next = info_tmp;
		}
		else{
			info_head = info_tail = info_tmp;
		}
		add_task(process, (void*)info_tmp);
	}

	for (info_tmp = info_head; info_tmp; free(info_tmp))
		info_head = info_head -> next;

	return 0;
}

吐槽下多線程的調試。。。gdb調試多線程有點蛋疼,單步時很是easypthread_cond_wait把pthread_cond_signal的信號錯過,出現各類錯誤,比方Cannot find bounds of current function等,建議你們仍是多作輸出,或者用日誌的方式調。
set scheduler-locking off|on|step  這個命令仍是很是實用的,因爲用step或者continue命令調試當前被調試線程的時候,其它線程也是同一時候運行的。
詳細地:off 不鎖定不論什麼線程,也就是所有線程都運行,這是默認值。 on 僅僅有當前被調試程序會運行。 step 在單步的時候,除了next過一個函數的狀況之外,僅僅有當前線程會運行。

反正我最後是老老實實輸出。。各位路過大牛有什麼好方法,求指點 ~~

相關文章
相關標籤/搜索