TCP/IP網絡編程 基於Linux編程_4 --多線程服務器端的實現

線程基本概念

前面咱們講過多進程服務器,但咱們知道它開銷很大,所以咱們才引入線程,咱們能夠把它當作是一種輕量級進程。它相比進程有以下幾個優勢:數組

  • 線程的建立和上下文切換開銷更小且速度更快。
  • 線程間交換數據時無需特殊技術。

進程:在操做系統構成單獨執行流的單位。
線程:在進程構成單獨執行流的單位。
它們的包含關係是,操做系統 > 進程 > 線程。進程與線程具體差別實際上是這樣的,每一個進程都有獨立的完整內存空間,它包括全局數據區,堆區,棧區,而多進程服務器之因此開銷大是由於只是爲了區分棧區裏的不一樣函數流執行而把數據區,堆區,棧區內存所有複製了一份。而多線程就高效多了,它只把棧區分離出來,進程中的數據區,堆區則共享。具體內存結構示例圖以下:
這裏寫圖片描述服務器


這裏寫圖片描述

線程建立及運行

線程具備單獨的執行流,所以須要單獨定義線程的入口函數,並且還須要請求操做系統在單獨的執行流中執行該函數,完成這個功能的函數以下:markdown

#include <pthread.h>

int pthread_create(
    pthread_t * restrict thread,//保存線程ID
    const pthread_attr_t * restrict attr,//線程屬性,NULL默認屬性
    void * (* start_routine)(void *), //線程入口函數,函數指針
    void * restrict arg //傳遞給入口函數的參數
);

實例代碼:多線程

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

void * thread_main(void *arg);

int main(int argc, char *argv[])
{
    pthread_t t_id;
    int thread_param = 5;

    if (pthread_create(&t_id, NULL, thread_main, (void *)&thread_param) != 0)
    {
        puts("pthread_create() error");
        return -1;
    }

    sleep(10);
    puts("end of main");
    return 0;
}

void * thread_main(void *arg)
{
    int i;
    int cnt =* ((int *)arg);
    for (i = 0; i < cnt; i++)
    {
        sleep(1);
        puts("running thread");
    }
    return NULL;
}

這裏寫圖片描述


上面實例是用sleep延遲來控制線程的執行的,若是主線程不作延遲那麼執行到return 0;時,進程就結束了,相應的線程也會銷燬。而明顯用sleep這種方式控制線程執行流是不合理的,下面咱們來看看一個更好的延遲函數,調用該函數的進程(或線程)將進入等待狀態,直到第一個參數爲ID的線程終止爲止。並且能夠獲得線程的入口函數返回值。併發

#include <pthread.h>

int pthread_join(pthread_t thread, void **status);
參數1:線程ID
參數2:保存線程入口函數的返回值

實例代碼:app

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
void * thread_main(void *arg);

int main(int argc, char *argv[])
{
    pthread_t t_id;
    int thread_param = 5;
    void * thr_ret;

    //建立線程
    if (pthread_create(&t_id, NULL, thread_main, (void *)&thread_param) != 0)
    {
        puts("pthread_create() error");
        return -1;
    }

    //等待線程返回
    if (pthread_join(t_id, &thr_ret) != 0)
    {
        puts("pthread_join() error");
        return -1;
    }

    printf("Thread return message: %s \n", (char *)thr_ret);
    free(thr_ret);
    return 0;
}

//線程入口函數
void * thread_main(void *arg)
{
    int i;
    int cnt =* ((int *)arg);
    char * msg = (char *)malloc(sizeof(char) * 50);
    strcpy(msg, "Hello, I am thread ~ \n");

    for (i = 0; i < cnt; i++)
    {
        puts("running thread");
    }
    return (void *)msg;
}

這裏寫圖片描述

線程存在的問題和臨界區

前面咱們知道了怎麼建立線程,但咱們都是隻建立了一個線程,下面咱們再來看看這樣一個實例,建立100個線程,它們都訪問了同一變量,其中一半對這個變量進行加1操做,一半進行減1操做,按道理其結果會等於0.socket

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);
long long num = 0;   //long long類型是64位整數型,多線程共同訪問

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    //建立100個線程,一半執行thread_inc,一半執行thread_des
    for(i = 0; i < NUM_THREAD; i++)
    {
        if(i %2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    //等待線程返回
    for (i = 0; i < NUM_THREAD; i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);  //+1,-1按道理結果是0
    return 0;
}

//線程入口函數1
void * thread_inc(void * arg)
{
    for (int i = 0; i < 50000000; i++)
        num += 1;//臨界區(引發問題的語句就是臨界區位置)
    return NULL;
}

//線程入口函數2
void * thread_des(void * arg)
{
    for (int i = 0; i < 50000000; i++)
        num -= 1;//臨界區
    return NULL;
}

這裏寫圖片描述

從運行結果看並非0,並且每次運行的結果都不一樣。那這是什麼緣由引發的呢? 是由於每一個線程訪問一個變量是這樣一個過程:先從內存取出這個變量值到CPU,而後CPU計算獲得改變後的值,最後再將這個改變後的值寫回內存。所以,咱們能夠很容易看出,多個線程訪問同一變量,若是某個線程還只剛從內存取出數據,還沒來得及寫回內存,這時其它線程又訪問了這個變量,因此這個值就會不正確了。函數

接下來咱們再來說講怎麼解決這個問題:線程同步post

線程同步

線程同步用於解決線程訪問順序引起的問題,通常是以下兩種狀況:測試

  1. 同時訪問同一內存空間時發生的狀況
  2. 須要指定訪問同一內存空間的線程執行順序的狀況

針對這兩種可能引起的狀況,咱們分別使用的同步技術是:互斥量和信號量。

  • 互斥量
    互斥量技術從字面也能夠理解,就是臨界區有線程訪問,其它線程就得排隊等待,它們的訪問是互斥的,實現方式就是給臨界區加鎖與釋放鎖。
#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);  //建立互斥量

int pthread_mutex_destroy(pthread_mutex_t *mutex);//銷燬互斥量

int pthread_mutex_lock(pthread_mutex_t *mutex);//加鎖

int pthread_mutex_unlock(pthread_mutex_t *mutex);//釋放鎖

簡言之,就是利用lock和unlock函數圍住臨界區的兩端。當某個線程調用pthread_mutex_lock進入臨界區後,若是沒有調用pthread_mutex_unlock釋放鎖退出,那麼其它線程就會一直阻塞在臨界區以外,咱們把這種狀況稱之爲死鎖。因此臨界區圍住必定要lock和unlock一一對應。

實例代碼:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);

long long num = 0;
pthread_mutex_t mutex;

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    //互斥量的建立
    pthread_mutex_init(&mutex, NULL);

    for(i = 0; i < NUM_THREAD; i++)
    {
        if(i %2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    for (i = 0; i < NUM_THREAD; i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);
    pthread_mutex_destroy(&mutex);   //互斥量的銷燬
    return 0;
}

/*擴展臨界區,減小加鎖,釋放鎖調用次數,但這樣變量必須加滿到50000000次後其它線程才能訪問. 這樣是延長了線程的等待時間,但縮短了加鎖,釋放鎖函數調用的時間,這裏沒有定論,本身酌情考慮*/
void * thread_inc(void * arg)
{
    pthread_mutex_lock(&mutex);  //互斥量鎖住
    for (int i = 0; i < 1000000; i++)
        num += 1;
    pthread_mutex_unlock(&mutex); //互斥量釋放鎖
    return NULL;
}

/*縮短了線程等待時間,但循環建立,釋放鎖函數調用時間增長*/
void * thread_des(void * arg)
{
    for (int i = 0; i < 1000000; i++)
    {
        pthread_mutex_lock(&mutex);
        num -= 1;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

這裏寫圖片描述

  • 信號量
    信號量與互斥量相似,只是互斥量是用鎖來控制線程訪問而信號量是用二進制0,1來完成控制線程順序。sem_post信號量加1,sem_wait信號量減1,當信號量爲0時,sem_wait就會阻斷,所以經過這樣讓信號量加1減1就能控制線程的執行順序了。
    註釋:mac上測試信號量函數返回-1失敗,之後仍是Linux上整吧,也許這些接口已通過時了…
#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);//建立信號量
int sem_destroy(sem_t *sem);//銷燬信號量
int sem_post(sem_t *sem);//信號量加1
int sem_wait(sem_t *sem);//信號量減1,爲0時阻塞

實例代碼:線程A從用戶輸入獲得值後存入全局變量num,此時線程B將取走該值並累加。該過程共進行5次,完成後輸出總和並退出程序。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

void * read(void * arg);
void * accu(void * arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;


int main(int argc, char *argv[])
{
    pthread_t id_t1, id_t2;
    sem_init(&sem_one, 0, 0);
    sem_init(&sem_two, 0, 1);

    pthread_create(&id_t1, NULL, read, NULL);
    pthread_create(&id_t2, NULL, accu, NULL);

    pthread_join(id_t1, NULL);
    pthread_join(id_t2, NULL);

    sem_destroy(&sem_one);
    sem_destroy(&sem_two);

    return 0;
}

void * read(void * arg)
{
    int i;
    for (i = 0; i < 5; i++) {
        fputs("Input num: ", stdout);
        sem_wait(&sem_two);
        scanf("%d", &num);
        sem_post(&sem_one);
    }
    return NULL;
}

void * accu(void * arg)
{
    int sum = 0 , i;
    for (i = 0; i < 5; i++) {
        sem_wait(&sem_one);
        sum+= num;
        sem_post(&sem_two);
    }
    printf("Result: %d \n", sum);
    return NULL;
}

補充:線程的銷燬,線程建立後並非其入口函數返回後就會自動銷燬,須要手動銷燬,否則線程建立的內存空間將一直存在。通常手動銷燬有以下兩種方式:1,調用pthread_join函數,其返回後同時銷燬線程 ,是一個阻斷函數,服務端通常不用它銷燬,由於服務端主線程不宜阻斷,還要實時監聽客服端鏈接。2,調用pthread_detach函數,不會阻塞,線程返回自動銷燬線程,不過要注意調用它後不能再調用pthread_join函數,它與pthread_join主要區別就是一個是阻塞函數,一個不阻塞。

多線程併發服務端的實現

使用多線程實現了一個簡單的聊天程序,並對臨界區(clnt_cnt,clnt_socks)進行加鎖訪問.

  • 服務端:
//
// main.cpp
// hello_server
//
// Created by app05 on 15-10-22.
// Copyright (c) 2015年 app05. All rights reserved.
//臨界區是:clnt_cnt和clnt_socks訪問處

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define MAX_CLNT 256

void * handle_clnt(void * arg);
void send_msg(char *msg, int len);
void error_handling(char * msg);
int clnt_cnt = 0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;

int main(int argc, char *argv[])
{
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    socklen_t clnt_adr_sz;
    pthread_t t_id;
    if (argc != 2) {
        printf("Usage : %s <port> \n", argv[0]);
        exit(1);
    }

    //建立互斥量
    pthread_mutex_init(&mutx, NULL);
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);

    memset(&serv_adr, 0, sizeof(serv_adr));
    serv_adr.sin_family = AF_INET;
    serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_adr.sin_port = htons(atoi(argv[1]));

    if(bind(serv_sock, (struct sockaddr *) &serv_adr, sizeof(serv_adr)) == -1)
        error_handling("bind() error");
    if(listen(serv_sock, 5) == -1)
        error_handling("listen() error");

    while (1)
    {
        clnt_adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz); //阻斷,監聽客服端鏈接請求

        //臨界區
        pthread_mutex_lock(&mutx); //加鎖
        clnt_socks[clnt_cnt++] = clnt_sock; //新鏈接的客服端保存到clnt_socks數組裏
        pthread_mutex_unlock(&mutx); //釋放鎖

        //建立線程
        pthread_create(&t_id, NULL, handle_clnt, (void*) &clnt_sock);
        pthread_detach(t_id); //銷燬線程,線程return後自動調用銷燬,不阻斷

        printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr));
    }

    close(serv_sock);
    return 0;
}

//線程執行
void * handle_clnt(void * arg)
{
    int clnt_sock = *((int *)arg);
    int str_len = 0, i;
    char msg[BUF_SIZE];

    while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
        send_msg(msg, str_len);

    //從數組中移除當前客服端
    pthread_mutex_lock(&mutx);
    for (i = 0; i < clnt_cnt; i++)
    {
        if (clnt_sock == clnt_socks[i])
        {
            while (i++ < clnt_cnt - 1)
                clnt_socks[i] = clnt_socks[i + 1];
            break;
        }
    }
    clnt_cnt--;
    pthread_mutex_unlock(&mutx);
    close(clnt_sock);
    return NULL;
}

//向全部鏈接的客服端發送消息
void send_msg(char * msg, int len)
{
    int i;
    pthread_mutex_lock(&mutx);
    for (i = 0; i < clnt_cnt; i++)
        write(clnt_socks[i], msg, len);
    pthread_mutex_unlock(&mutx);
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}
  • 客服端
//
// main.cpp
// hello_client
//
// Created by app05 on 15-10-22.
// Copyright (c) 2015年 app05. All rights reserved.
//
//

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define NAME_SIZE 20

void * send_msg(void * arg);
void * recv_msg(void * arg);
void error_handling(char *message);

char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];

int main(int argc, const char * argv[]) {
    int sock;
    struct sockaddr_in serv_addr;
    pthread_t snd_thread, rcv_thread;
    void * thread_return;

    if(argc != 4)
    {
        printf("Usage: %s <IP> <port> \n", argv[0]);
        exit(1);
    }

    sprintf(name, "[%s]", argv[3]); //聊天人名字,配置到編譯器參數裏
    sock = socket(PF_INET, SOCK_STREAM, 0);
    if(sock == -1)
        error_handling("socket() error");

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
    serv_addr.sin_port = htons(atoi(argv[2]));

    if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1)
        error_handling("connect() error");

    //多線程分離輸入和輸出
    pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
    pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
    //阻塞,等待返回
    pthread_join(snd_thread, &thread_return);
    pthread_join(rcv_thread, &thread_return);

    close(sock);
    return 0;
}

//發送消息
void * send_msg(void * arg)
{
    int sock = *((int *)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    while (1) {
        fgets(msg, BUF_SIZE, stdin);
        if (!strcmp(msg, "q\n") || !strcmp(msg, "Q \n")) {
            close(sock);
            exit(0);
        }
        sprintf(name_msg, "%s %s", name, msg);
        write(sock, name_msg, strlen(name_msg));
    }
    return NULL;
}

//接收消息
void * recv_msg(void * arg)
{
    int sock = *((int *)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    int str_len;
    while (1) {
        str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
        if(str_len == -1)
            return (void *)-1;
        name_msg[str_len] = 0;
        fputs(name_msg, stdout);
    }
    return NULL;
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

這裏寫圖片描述

這裏寫圖片描述

這裏寫圖片描述

相關文章
相關標籤/搜索