c++ 網絡編程(九)LINUX/windows-IOCP模型 多線程超詳細教程及多線程實現服務端

 

 

原文做者:aircrafthtml

原文連接:https://www.cnblogs.com/DOMLX/p/9661012.html前端

 

 先講Linux下(windows下在後面能夠直接跳到後面看):python

 

一.線程基本概念

前面咱們講過多進程服務器,但咱們知道它開銷很大,所以咱們才引入線程,咱們能夠把它當作是一種輕量級進程。它相比進程有以下幾個優勢:react

  • 線程的建立和上下文切換開銷更小且速度更快。
  • 線程間交換數據時無需特殊技術。

進程:在操做系統構成單獨執行流的單位。
線程:在進程構成單獨執行流的單位。
它們的包含關係是,操做系統 > 進程 > 線程。進程與線程具體差別實際上是這樣的,每一個進程都有獨立的完整內存空間,它包括全局數據區,堆區,棧區,而多進程服務器之因此開銷大是由於只是爲了區分棧區裏的不一樣函數流執行而把數據區,堆區,棧區內存所有複製了一份。而多線程就高效多了,它只把棧區分離出來,進程中的數據區,堆區則共享。具體內存結構示例圖以下:
這裏寫圖片描述linux


這裏寫圖片描述

 

二.建立線程

下面的程序,咱們能夠用它來建立一個線程:ios

#include <pthread.h> pthread_create (thread, attr, start_routine, arg)

在這裏,pthread_create 建立一個新的線程,並讓它可執行。下面是關於參數的說明:c++

參數 描述
thread 指向線程標識符指針。
attr 一個不透明的屬性對象,能夠被用來設置線程屬性。您能夠指定線程屬性對象,也可使用默認值 NULL。
start_routine 線程運行函數起始地址,一旦線程被建立就會執行。
arg 運行函數的參數。它必須經過把引用做爲指針強制轉換爲 void 類型進行傳遞。若是沒有傳遞參數,則使用 NULL。

建立線程成功時,函數返回 0,若返回值不爲 0 則說明建立線程失敗。編程

終止線程

使用下面的程序,咱們能夠用它來終止一個線程:windows

#include <pthread.h> pthread_exit (status)

在這裏,pthread_exit 用於顯式地退出一個線程。一般狀況下,pthread_exit() 函數是在線程完成工做後無需繼續存在時被調用。後端

若是 main() 是在它所建立的線程以前結束,並經過 pthread_exit() 退出,那麼其餘線程將繼續執行。不然,它們將在 main() 結束時自動被終止。

實例

如下簡單的實例代碼使用 pthread_create() 函數建立了 5 個線程,每一個線程輸出"Hello Runoob!":

#include <iostream>
// 必須的頭文件
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS 5
 
// 線程的運行函數
void* say_hello(void* args)
{
    cout << "Hello Runoob!" << endl;
    return 0;
}
 
int main()
{
    // 定義線程的 id 變量,多個變量使用數組
    pthread_t tids[NUM_THREADS];
    for(int i = 0; i < NUM_THREADS; ++i)
    {
        //參數依次是:建立的線程id,線程參數,調用的函數,傳入的函數參數
        int ret = pthread_create(&tids[i], NULL, say_hello, NULL);
        if (ret != 0)
        {
           cout << "pthread_create error: error_code=" << ret << endl;
        }
    }
    //等各個線程退出後,進程才結束,不然進程強制結束了,線程可能還沒反應過來;
    pthread_exit(NULL);
}

linux下編譯運行後結果爲:

Hello Runoob

Hello Runoob

Hello Runoob

Hello Runoob

Hello Runoob

 

如下簡單的實例代碼使用 pthread_create() 函數建立了 5 個線程,並接收傳入的參數。每一個線程打印一個 "Hello Runoob!" 消息,並輸出接收的參數,而後調用 pthread_exit() 終止線程。

//文件名:test.cpp
 
#include <iostream>
#include <cstdlib>
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
void *PrintHello(void *threadid)
{  
   // 對傳入的參數進行強制類型轉換,由無類型指針變爲整形數指針,而後再讀取
   int tid = *((int*)threadid);
   cout << "Hello Runoob! 線程 ID, " << tid << endl;
   pthread_exit(NULL);
}
 
int main ()
{
   pthread_t threads[NUM_THREADS];
   int indexes[NUM_THREADS];// 用數組來保存i的值
   int rc;
   int i;
   for( i=0; i < NUM_THREADS; i++ ){      
      cout << "main() : 建立線程, " << i << endl;
      indexes[i] = i; //先保存i的值
      // 傳入的時候必須強制轉換爲void* 類型,即無類型指針        
      rc = pthread_create(&threads[i], NULL, 
                          PrintHello, (void *)&(indexes[i]));
      if (rc){
         cout << "Error:沒法建立線程," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

linux下編譯運行後結果爲:

main() : 建立線程, 0 main() : 建立線程, 1 Hello Runoob! 線程 ID, 0 main() : 建立線程, Hello Runoob! 線程 ID, 21 main() : 建立線程, 3 Hello Runoob! 線程 ID, 2 main() : 建立線程, 4 Hello Runoob! 線程 ID, 3
 

向線程傳遞參數

這個實例演示瞭如何經過結構傳遞多個參數。您能夠在線程回調中傳遞任意的數據類型,由於它指向 void,以下面的實例所示:

#include <iostream>
#include <cstdlib>
#include <pthread.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
struct thread_data{
   int  thread_id;
   char *message;
};
 
void *PrintHello(void *threadarg)
{
   struct thread_data *my_data;
 
   my_data = (struct thread_data *) threadarg;
 
   cout << "Thread ID : " << my_data->thread_id ;
   cout << " Message : " << my_data->message << endl;
 
   pthread_exit(NULL);
}
 
int main ()
{
   pthread_t threads[NUM_THREADS];
   struct thread_data td[NUM_THREADS];
   int rc;
   int i;
 
   for( i=0; i < NUM_THREADS; i++ ){
      cout <<"main() : creating thread, " << i << endl;
      td[i].thread_id = i;
      td[i].message = (char*)"This is message";
      rc = pthread_create(&threads[i], NULL,
                          PrintHello, (void *)&td[i]);
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
   pthread_exit(NULL);
}

linux下編譯運行後結果爲:

 

main() : creating thread, 0 main() : creating thread, 1 Thread ID : 0 Message : This is message main() : creating thread, Thread ID : 21 Message : This is message main() : creating thread, 3 Thread ID : 2 Message : This is message main() : creating thread, 4 Thread ID : 3 Message : This is message Thread ID : 4 Message : This is message
 
 

鏈接和分離線程

咱們可使用如下兩個函數來鏈接或分離線程:

pthread_join (threadid, status) pthread_detach (threadid)

pthread_join() 子程序阻礙調用程序,直到指定的 threadid 線程終止爲止。當建立一個線程時,它的某個屬性會定義它是不是可鏈接的(joinable)或可分離的(detached)。只有建立時定義爲可鏈接的線程才能夠被鏈接。若是線程建立時被定義爲可分離的,則它永遠也不能被鏈接。

用途:有的人沒有在main 函數最後調用 pthread_exit(NULL); 函數等待,而是選擇sleep,這裏就能夠用pthread_join()代替sleep的不可控制,,而有時候線程結束的時候你想作某一些事情須要知道線程是否結束了,也能夠調用這個函數。

這個實例演示瞭如何使用 pthread_join() 函數來等待線程的完成。

#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
 
using namespace std;
 
#define NUM_THREADS     5
 
void *wait(void *t)
{
   int i;
   long tid;
 
   tid = (long)t;
 
   sleep(1);
   cout << "Sleeping in thread " << endl;
   cout << "Thread with id : " << tid << "  ...exiting " << endl;
   pthread_exit(NULL);
}
 
int main ()
{
   int rc;
   int i;
   pthread_t threads[NUM_THREADS];
   pthread_attr_t attr;
   void *status;
 
   // 初始化並設置線程爲可鏈接的(joinable)
   pthread_attr_init(&attr);
   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
   for( i=0; i < NUM_THREADS; i++ ){
      cout << "main() : creating thread, " << i << endl;
      rc = pthread_create(&threads[i], NULL, wait, (void *)&i );
      if (rc){
         cout << "Error:unable to create thread," << rc << endl;
         exit(-1);
      }
   }
 
   // 刪除屬性,並等待其餘線程
   pthread_attr_destroy(&attr);
   for( i=0; i < NUM_THREADS; i++ ){
      rc = pthread_join(threads[i], &status);
      if (rc){
         cout << "Error:unable to join," << rc << endl;
         exit(-1);
      }
      cout << "Main: completed thread id :" << i ;
      cout << "  exiting with status :" << status << endl;
   }
 
   cout << "Main: program exiting." << endl;
   pthread_exit(NULL);
}

linux下編譯運行結果:

main() : creating thread, 0
main() : creating thread, 1
main() : creating thread, 2
main() : creating thread, 3
main() : creating thread, 4
Sleeping in thread 
Thread with id : 4  ...exiting 
Sleeping in thread 
Thread with id : 3  ...exiting 
Sleeping in thread 
Thread with id : 2  ...exiting 
Sleeping in thread 
Thread with id : 1  ...exiting 
Sleeping in thread 
Thread with id : 0  ...exiting 
Main: completed thread id :0  exiting with status :0
Main: completed thread id :1  exiting with status :0
Main: completed thread id :2  exiting with status :0
Main: completed thread id :3  exiting with status :0
Main: completed thread id :4  exiting with status :0
Main: program exiting.

 

 

二.線程運行中存在的問題

 

線程存在的問題和臨界區

前面咱們知道了怎麼建立線程,下面咱們再來看看這樣一個實例,建立100個線程,它們都訪問了同一變量,其中一半對這個變量進行加1操做,一半進行減1操做,按道理其結果會等於0.

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);
long long num = 0;   //long long類型是64位整數型,多線程共同訪問

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    //建立100個線程,一半執行thread_inc,一半執行thread_des
    for(i = 0; i < NUM_THREAD; i++)
    {
        if(i %2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    //等待線程返回
    for (i = 0; i < NUM_THREAD; i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);  //+1,-1按道理結果是0
    return 0;
}

//線程入口函數1
void * thread_inc(void * arg)
{
    for (int i = 0; i < 50000000; i++)
        num += 1;//臨界區(引發問題的語句就是臨界區位置)
    return NULL;
}

//線程入口函數2
void * thread_des(void * arg)
{
    for (int i = 0; i < 50000000; i++)
        num -= 1;//臨界區
    return NULL;
}

從運行結果看並非0,並且每次運行的結果都不一樣。那這是什麼緣由引發的呢? 是由於每一個線程訪問一個變量是這樣一個過程:先從內存取出這個變量值到CPU,而後CPU計算獲得改變後的值,最後再將這個改變後的值寫回內存。所以,咱們能夠很容易看出,多個線程訪問同一變量,若是某個線程還只剛從內存取出數據,還沒來得及寫回內存,這時其它線程又訪問了這個變量,因此這個值就會不正確了。

 

爲何會出現這種狀況呢,來舉個例子:

 

 

 

 

 

 

如上圖所示:兩個線程都要將某一個共同訪問的變量加1,

 

 

 

 

 

 

 

 

就像上面說的這個運算過程是:線程1先拿到值而後通過cpu的運算在賦值回去,而後線程2在取值運算放回,上圖實現的是最理想的狀況,假如這時候線程一拿到了值99,同時線程二沒間隔的也拿了99,這時候就要出問題了。線程一運算後賦值100回去,而後線程二運算後又賦值100回去,,,注意了哈,這裏兩個線程都是爲了Num++服務,他們這樣搞事情不就表明一個作了無用功嗎?(我胖虎要是還拿的動刀還不打死你!!!)

 

 

 

 

這些看完應該就理解了爲何須要線程同步!!!!以及線程同步的重要性了吧!!

 

 

 

 

接下來咱們再來說講怎麼解決這個問題:線程同步

線程同步

線程同步用於解決線程訪問順序引起的問題,通常是以下兩種狀況:

  1. 同時訪問同一內存空間時發生的狀況
  2. 須要指定訪問同一內存空間的線程執行順序的狀況

針對這兩種可能引起的狀況,咱們分別使用的同步技術是:互斥量和信號量。

    • 互斥量
      互斥量技術從字面也能夠理解,就是臨界區有線程訪問,其它線程就得排隊等待,它們的訪問是互斥的,實現方式就是給臨界區加鎖與釋放鎖。

 

 

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);  //建立互斥量

int pthread_mutex_destroy(pthread_mutex_t *mutex);//銷燬互斥量

int pthread_mutex_lock(pthread_mutex_t *mutex);//加鎖

int pthread_mutex_unlock(pthread_mutex_t *mutex);//釋放鎖

 

簡言之,就是利用lock和unlock函數圍住臨界區的兩端。當某個線程調用pthread_mutex_lock進入臨界區後,若是沒有調用pthread_mutex_unlock釋放鎖退出,那麼其它線程就會一直阻塞在臨界區以外,咱們把這種狀況稱之爲死鎖。因此臨界區圍住必定要lock和unlock一一對應。

接下來看一下代碼示例:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
void * thread_inc(void * arg);
void * thread_des(void * arg);

long long num = 0;
pthread_mutex_t mutex;

int main(int argc, char *argv[])
{
    pthread_t thread_id[NUM_THREAD];
    int i;

    //互斥量的建立
    pthread_mutex_init(&mutex, NULL);

    for(i = 0; i < NUM_THREAD; i++)
    {
        if(i %2)
            pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);
        else
            pthread_create(&(thread_id[i]), NULL, thread_des, NULL);
    }

    for (i = 0; i < NUM_THREAD; i++)
        pthread_join(thread_id[i], NULL);

    printf("result: %lld \n", num);
    pthread_mutex_destroy(&mutex);   //互斥量的銷燬
    return 0;
}

/*擴展臨界區,減小加鎖,釋放鎖調用次數,但這樣變量必須加滿到50000000次後其它線程才能訪問.
 這樣是延長了線程的等待時間,但縮短了加鎖,釋放鎖函數調用的時間,這裏沒有定論,本身酌情考慮*/
void * thread_inc(void * arg)
{
    pthread_mutex_lock(&mutex);  //互斥量鎖住
    for (int i = 0; i < 1000000; i++)
        num += 1;
    pthread_mutex_unlock(&mutex); //互斥量釋放鎖
    return NULL;
}

/*縮短了線程等待時間,但循環建立,釋放鎖函數調用時間增長*/
void * thread_des(void * arg)
{
    for (int i = 0; i < 1000000; i++)
    {
        pthread_mutex_lock(&mutex);
        num -= 1;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

編譯運行能夠獲得結果爲:0

 

信號量
信號量與互斥量相似,只是互斥量是用鎖來控制線程訪問而信號量是用二進制0,1來完成控制線程順序。sem_post信號量加1,sem_wait信號量減1,當信號量爲0時,sem_wait就會阻斷,所以經過這樣讓信號量加1減1就能控制線程的執行順序了。
註釋:mac上測試信號量函數返回-1失敗,之後仍是Linux上整吧,也許這些接口已通過時了…

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);//建立信號量
int sem_destroy(sem_t *sem);//銷燬信號量
int sem_post(sem_t *sem);//信號量加1
int sem_wait(sem_t *sem);//信號量減1,爲0時阻塞

實例代碼:線程A從用戶輸入獲得值後存入全局變量num,此時線程B將取走該值並累加。該過程共進行5次,完成後輸出總和並退出程序。

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>

void * read(void * arg);
void * accu(void * arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;


int main(int argc, char *argv[])
{
    pthread_t id_t1, id_t2;
    sem_init(&sem_one, 0, 0);
    sem_init(&sem_two, 0, 1);

    pthread_create(&id_t1, NULL, read, NULL);
    pthread_create(&id_t2, NULL, accu, NULL);

    pthread_join(id_t1, NULL);
    pthread_join(id_t2, NULL);

    sem_destroy(&sem_one);
    sem_destroy(&sem_two);

    return 0;
}

void * read(void * arg)
{
    int i;
    for (i = 0; i < 5; i++) {
        fputs("Input num: ", stdout);
        sem_wait(&sem_two);
        scanf("%d", &num);
        sem_post(&sem_one);
    }
    return NULL;
}

void * accu(void * arg)
{
    int sum = 0 , i;
    for (i = 0; i < 5; i++) {
        sem_wait(&sem_one);
        sum+= num;
        sem_post(&sem_two);
    }
    printf("Result: %d \n", sum);
    return NULL;
}

補充:線程的銷燬,線程建立後並非其入口函數返回後就會自動銷燬,須要手動銷燬,否則線程建立的內存空間將一直存在。通常手動銷燬有以下兩種方式:1,調用pthread_join函數,其返回後同時銷燬線程 ,是一個阻斷函數,服務端通常不用它銷燬,由於服務端主線程不宜阻斷,還要實時監聽客服端鏈接。2,調用pthread_detach函數,不會阻塞,線程返回自動銷燬線程,不過要注意調用它後不能再調用pthread_join函數,它與pthread_join主要區別就是一個是阻塞函數,一個不阻塞。

 

四.多線程併發服務端的實現

使用多線程實現了一個簡單的聊天程序,並對臨界區(clnt_cnt,clnt_socks)進行加鎖訪問.

  • 服務端:

 

//
//  main.cpp
//  hello_server
//
//  Created by app05 on 15-10-22.
//  Copyright (c) 2015年 app05. All rights reserved.
//臨界區是:clnt_cnt和clnt_socks訪問處

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define MAX_CLNT 256

void * handle_clnt(void * arg);
void send_msg(char *msg, int len);
void error_handling(char * msg);
int clnt_cnt = 0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;

int main(int argc, char *argv[])
{
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    socklen_t clnt_adr_sz;
    pthread_t t_id;
    if (argc != 2) {
        printf("Usage : %s <port> \n", argv[0]);
        exit(1);
    }

    //建立互斥量
    pthread_mutex_init(&mutx, NULL);
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);

    memset(&serv_adr, 0, sizeof(serv_adr));
    serv_adr.sin_family = AF_INET;
    serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_adr.sin_port = htons(atoi(argv[1]));

    if(bind(serv_sock, (struct sockaddr *) &serv_adr, sizeof(serv_adr)) == -1)
        error_handling("bind() error");
    if(listen(serv_sock, 5) == -1)
        error_handling("listen() error");

    while (1)
    {
        clnt_adr_sz = sizeof(clnt_adr);
        clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz); //阻斷,監聽客服端鏈接請求

        //臨界區
        pthread_mutex_lock(&mutx); //加鎖
        clnt_socks[clnt_cnt++] = clnt_sock; //新鏈接的客服端保存到clnt_socks數組裏
        pthread_mutex_unlock(&mutx); //釋放鎖

        //建立線程
        pthread_create(&t_id, NULL, handle_clnt, (void*) &clnt_sock);
        pthread_detach(t_id); //銷燬線程,線程return後自動調用銷燬,不阻斷

        printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr));
    }

    close(serv_sock);
    return 0;
}

//線程執行
void * handle_clnt(void * arg)
{
    int clnt_sock = *((int *)arg);
    int str_len = 0, i;
    char msg[BUF_SIZE];

    while ((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)
        send_msg(msg, str_len);

    //從數組中移除當前客服端
    pthread_mutex_lock(&mutx);
    for (i = 0; i < clnt_cnt; i++)
    {
        if (clnt_sock == clnt_socks[i])
        {
            while (i++ < clnt_cnt - 1)
                clnt_socks[i] = clnt_socks[i + 1];
            break;
        }
    }
    clnt_cnt--;
    pthread_mutex_unlock(&mutx);
    close(clnt_sock);
    return NULL;
}

//向全部鏈接的客服端發送消息
void send_msg(char * msg, int len)
{
    int i;
    pthread_mutex_lock(&mutx);
    for (i = 0; i < clnt_cnt; i++)
        write(clnt_socks[i], msg, len);
    pthread_mutex_unlock(&mutx);
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

 

客戶端:

//
//  main.cpp
//  hello_client
//
//  Created by app05 on 15-10-22.
//  Copyright (c) 2015年 app05. All rights reserved.
//
//

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100
#define NAME_SIZE 20

void * send_msg(void * arg);
void * recv_msg(void * arg);
void error_handling(char *message);

char name[NAME_SIZE] = "[DEFAULT]";
char msg[BUF_SIZE];

int main(int argc, const char * argv[]) {
    int sock;
    struct sockaddr_in serv_addr;
    pthread_t snd_thread, rcv_thread;
    void * thread_return;

    if(argc != 4)
    {
        printf("Usage: %s <IP> <port> \n", argv[0]);
        exit(1);
    }

    sprintf(name, "[%s]", argv[3]); //聊天人名字,配置到編譯器參數裏
    sock = socket(PF_INET, SOCK_STREAM, 0);
    if(sock == -1)
        error_handling("socket() error");

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = inet_addr(argv[1]);
    serv_addr.sin_port = htons(atoi(argv[2]));

    if (connect(sock, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1)
        error_handling("connect() error");

    //多線程分離輸入和輸出
    pthread_create(&snd_thread, NULL, send_msg, (void *)&sock);
    pthread_create(&rcv_thread, NULL, recv_msg, (void *)&sock);
    //阻塞,等待返回
    pthread_join(snd_thread, &thread_return);
    pthread_join(rcv_thread, &thread_return);

    close(sock);
    return 0;
}

//發送消息
void * send_msg(void * arg)
{
    int sock = *((int *)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    while (1) {
        fgets(msg, BUF_SIZE, stdin);
        if (!strcmp(msg, "q\n") || !strcmp(msg, "Q \n")) {
            close(sock);
            exit(0);
        }
        sprintf(name_msg, "%s  %s", name, msg);
        write(sock, name_msg, strlen(name_msg));
    }
    return NULL;
}

//接收消息
void * recv_msg(void * arg)
{
    int sock = *((int *)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    int str_len;
    while (1) {
        str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
        if(str_len == -1)
            return (void *)-1;
        name_msg[str_len] = 0;
        fputs(name_msg, stdout);
    }
    return NULL;
}

void error_handling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

 

 

windows下:

 

一.線程概述

 

理解Windows內核對象

 

線程是系統內核對象之一。在學習線程以前,應先了解一下內核對象。內核對象是系統內核分配的一個內存塊,該內存塊描述的是一個數據結構,其成員負責維護對象的各類信息。內核對象的數據只能由系統內核來訪問,應用程序沒法在內存中找到這些數據結構並直接改變他們的內容。

 

經常使用的系統內核對象有事件對象、文件對象、做業對象、互斥對象、管道對象、進程對象和線程對象等。不一樣類型的內核對象,其數據結構各有不一樣。

 

理解進程和線程

 

進程被認爲是一個正在運行的程序的實例,它也屬於系統內核對象。能夠將進程簡單的理解爲一個容器,它只是提供空間,執行程序的代碼是由線程來實現的。線程存在於進程中,它負責執行進程地址空間中的代碼。當一個進程建立時,系統會自動爲其建立一個線程,該線程被稱爲主線程。在主線程中用戶能夠經過代碼建立其餘線程,當進程中的主線程結束時,進程也就結束了。

 

線程的建立

 

Windows下,建立線程有多種方式,如下將逐一介紹。注意它們的區別。

 

使用CreateThread函數建立線程

 

Windows API函數。該函數在主線程的基礎上建立一個新線程。微軟在Windows API中提供了創建新的線程的函數CreateThread。

HANDLECreateThread( LPSECURITY_ATTRIBUTES lpThreadAttributes,//線程安全屬性 DWORD dwStackSize,//堆棧大小 LPTHREAD_START_ROUTINE lpStartAddress,//線程函數 LPVOID lpParameter,//線程參數 DWORD dwCreationFlags,//線程建立屬性 LPDWORD lpThreadId//線程ID );
 

示例代碼:
#include "stdafx.h"
#include<iostream>
#include<Windows.h>
using namespace std;

DWORD WINAPI Fun1Proc(LPVOID lpParameter)
{
    cout << "thread function Fun1Proc!\n";

    return 0;
}

int main()
{
    HANDLE hThread1 = CreateThread(NULL, 0, Fun1Proc, NULL, 0, NULL);
    CloseHandle(hThread1);

    Sleep(1000);
    cout << "main end!\n";
    system("pause");
    return 0;
}

結果圖:

 

使用_beginthreadex函數建立線程

除了使用CreateThread API函數建立線程外,還能夠用C++語言提供的_beginthreadex函數來建立線程。

uintptr_t _beginthreadex( // NATIVE CODE void *security, //線程安全屬性 unsigned stack_size, //線程的棧大小 unsigned ( *start_address )( void * ),//線程函數 void *arglist, //傳遞到線程函數中的參數 unsigned initflag, //線程初始化標記 unsigned *thrdaddr //線程ID ); 

 

 示例代碼:
#include "stdafx.h"
#include<iostream>
#include<Windows.h>
#include<process.h>
using namespace std;

unsigned int _stdcall ThreadProc(LPVOID lpParameter)
{
    cout << "thread function ThreadProc!\n";
    return 0;
}

int main()
{
    _beginthreadex(NULL, 0, ThreadProc, 0, 0, NULL);

    Sleep(1000);
    cout << "main end!\n";
    system("pause");
    return 0;
}
 

二.線程同步

 

爲何要進行線程同步?

  在程序中使用多線程時,通常不多有多個線程能在其生命期內進行徹底獨立的操做。更多的狀況是一些線程進行某些處理操做,而其餘的線程必須對其處理結果進行了解。正常狀況下對這種處理結果的瞭解應當在其處理任務完成後進行。 
  若是不採起適當的措施,其餘線程每每會在線程處理任務結束前就去訪問處理結果,這就頗有可能獲得有關處理結果的錯誤瞭解。例如,多個線程同時訪問同一個全局變量,若是都是讀取操做,則不會出現問題。若是一個線程負責改變此變量的值,而其餘線程負責同時讀取變量內容,則不能保證讀取到的數據是通過寫線程修改後的。 
  爲了確保讀線程讀取到的是通過修改的變量,就必須在向變量寫入數據時禁止其餘線程對其的任何訪問,直至賦值過程結束後再解除對其餘線程的訪問限制。這種保證線程能瞭解其餘線程任務處理結束後的處理結果而採起的保護措施即爲線程同步。

代碼示例: 
兩個線程同時對一個全局變量進行加操做,演示了多線程資源訪問衝突的狀況。

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1;
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
    }
 
    return 0;
}
 
int main()
{
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

 

能夠看到有時兩個線程計算的值相同,這就跟上面Linux下建立一百個線程將數字加減爲0沒成功同樣的道理,都是訪問內存的時候衝突了。

爲何會出現這種狀況呢,來舉個例子:

 

 

如上圖所示:兩個線程都要將某一個共同訪問的變量加1,

 

 

 

就像上面說的這個運算過程是:線程1先拿到值而後通過cpu的運算在賦值回去,而後線程2在取值運算放回,上圖實現的是最理想的狀況,假如這時候線程一拿到了值99,同時線程二沒間隔的也拿了99,這時候就要出問題了。線程一運算後賦值100回去,而後線程二運算後又賦值100回去,,,注意了哈,這裏兩個線程都是爲了Num++服務,他們這樣搞事情不就表明一個作了無用功嗎?(我胖虎要是還拿的動刀還不打死你!!!)

 

這些看完應該就理解了爲何須要線程同步!!!!以及線程同步的重要性了吧!!

 

關於線程同步

線程之間通訊的兩個基本問題是互斥和同步。

  • 線程同步是指線程之間所具備的一種制約關係,一個線程的執行依賴另外一個線程的消息,當它沒有獲得另外一個線程的消息時應等待,直到消息到達時才被喚醒。
  • 線程互斥是指對於共享的操做系統資源(指的是廣義的」資源」,而不是Windows的.res文件,譬如全局變量就是一種共享資源),在各線程訪問時的排它性。當有若干個線程都要使用某一共享資源時,任什麼時候刻最多隻容許一個線程去使用,其它要使用該資源的線程必須等待,直到佔用資源者釋放該資源。

線程互斥是一種特殊的線程同步。實際上,互斥和同步對應着線程間通訊發生的兩種狀況:

  • 當有多個線程訪問共享資源而不使資源被破壞時;
  • 當一個線程須要將某個任務已經完成的狀況通知另一個或多個線程時。

從大的方面講,線程的同步可分用戶模式的線程同步和內核對象的線程同步兩大類。

  • 用戶模式中線程的同步方法主要有原子訪問和臨界區等方法。其特色是同步速度特別快,適合於對線程運行速度有嚴格要求的場合。
  • 內核對象的線程同步則主要由事件、等待定時器、信號量以及信號燈等內核對象構成。因爲這種同步機制使用了內核對象,使用時必須將線程從用戶模式切換到內核模式,而這種轉換通常要耗費近千個CPU週期,所以同步速度較慢,但在適用性上卻要遠優於用戶模式的線程同步方式。

在WIN32中,同步機制主要有如下幾種: 
(1)事件(Event); 
(2)信號量(semaphore); 
(3)互斥量(mutex); 
(4)臨界區(Critical section)。

Win32中的四種同步方式

臨界區

臨界區(Critical Section)是一段獨佔對某些共享資源訪問的代碼,在任意時刻只容許一個線程對共享資源進行訪問。若是有多個線程試圖同時訪問臨界區,那麼在有一個線程進入後其餘全部試圖訪問此臨界區的線程將被掛起,並一直持續到進入臨界區的線程離開。臨界區在被釋放後,其餘線程能夠繼續搶佔,並以此達到用原子方式操做共享資源的目的。

臨界區在使用時以CRITICAL_SECTION結構對象保護共享資源,並分別用EnterCriticalSection()和LeaveCriticalSection()函數去標識和釋放一個臨界區。所用到的CRITICAL_SECTION結構對象必須通過InitializeCriticalSection()的初始化後才能使用,並且必須確保全部線程中的任何試圖訪問此共享資源的代碼都處在此臨界區的保護之下。不然臨界區將不會起到應有的做用,共享資源依然有被破壞的可能。

代碼示例:

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定義全局變量
CRITICAL_SECTION Critical;      //定義臨界區句柄
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        EnterCriticalSection(&Critical);
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        LeaveCriticalSection(&Critical);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        EnterCriticalSection(&Critical);
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        LeaveCriticalSection(&Critical);
    }
 
    return 0;
}
 
int main()
{
    InitializeCriticalSection(&Critical);   //初始化臨界區對象
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

問題解決!!!

 

 

事件

事件(Event)是WIN32提供的最靈活的線程間同步方式,事件能夠處於激發狀態(signaled or true)或未激發狀態(unsignal or false)。根據狀態變遷方式的不一樣,事件可分爲兩類: 
(1)手動設置:這種對象只可能用程序手動設置,在須要該事件或者事件發生時,採用SetEvent及ResetEvent來進行設置。 
(2)自動恢復:一旦事件發生並被處理後,自動恢復到沒有事件狀態,不須要再次設置。

使用」事件」機制應注意如下事項: 
(1)若是跨進程訪問事件,必須對事件命名,在對事件命名的時候,要注意不要與系統命名空間中的其它全局命名對象衝突; 
(2)事件是否要自動恢復; 
(3)事件的初始狀態設置。

因爲event對象屬於內核對象,故進程B能夠調用OpenEvent函數經過對象的名字得到進程A中event對象的句柄,而後將這個句柄用於ResetEvent、SetEvent和WaitForMultipleObjects等函數中。此法能夠實現一個進程的線程控制另外一進程中線程的運行,例如:

HANDLE hEvent=OpenEvent(EVENT_ALL_ACCESS,true,"MyEvent"); 
ResetEvent(hEvent);

示例代碼:

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定義全局變量
HANDLE hEvent;  //定義事件句柄
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hEvent, INFINITE);  //等待對象爲有信號狀態
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        SetEvent(hEvent);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hEvent, INFINITE);  //等待對象爲有信號狀態
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        SetEvent(hEvent);
    }
 
    return 0;
}
 
int main()
{
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
    hEvent = CreateEvent(NULL, FALSE, TRUE, "event");
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

運行結果都同樣就不來顯示出來了。

信號量

信號量是維護0到指定最大值之間的同步對象。信號量狀態在其計數大於0時是有信號的,而其計數是0時是無信號的。信號量對象在控制上能夠支持有限數量共享資源的訪問。

信號量的特色和用途可用下列幾句話定義: 
(1)若是當前資源的數量大於0,則信號量有效; 
(2)若是當前資源數量是0,則信號量無效; 
(3)系統決不容許當前資源的數量爲負值; 
(4)當前資源數量決不能大於最大資源數量。

建立信號量

函數原型爲:

 HANDLE CreateSemaphore (
   PSECURITY_ATTRIBUTE psa, //信號量的安全屬性
   LONG lInitialCount, //開始時可供使用的資源數
   LONG lMaximumCount, //最大資源數
   PCTSTR pszName);     //信號量的名稱

釋放信號量

經過調用ReleaseSemaphore函數,線程就可以對信標的當前資源數量進行遞增,該函數原型爲:

BOOL WINAPI ReleaseSemaphore(
   HANDLE hSemaphore,   //要增長的信號量句柄
   LONG lReleaseCount, //信號量的當前資源數增長lReleaseCount
   LPLONG lpPreviousCount  //增長前的數值返回
   );

打開信號量 

和其餘核心對象同樣,信號量也能夠經過名字跨進程訪問,打開信號量的API爲:

 HANDLE OpenSemaphore (
   DWORD fdwAccess,      //access
   BOOL bInherithandle,  //若是容許子進程繼承句柄,則設爲TRUE
   PCTSTR pszName  //指定要打開的對象的名字
  );

代碼示例:

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定義全局變量
HANDLE hSemaphore;  //定義信號量句柄
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    long count;
    while (number < 100)
    {
        WaitForSingleObject(hSemaphore, INFINITE);  //等待信號量爲有信號狀態
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseSemaphore(hSemaphore, 1, &count);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    long count;
    while (number < 100)
    {
        WaitForSingleObject(hSemaphore, INFINITE);  //等待信號量爲有信號狀態
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseSemaphore(hSemaphore, 1, &count);
    }
 
    return 0;
}
 
int main()
{
    hSemaphore = CreateSemaphore(NULL, 1, 100, "sema");
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

結果同樣。

 

互斥量

採用互斥對象機制。 只有擁有互斥對象的線程纔有訪問公共資源的權限,由於互斥對象只有一個,因此能保證公共資源不會同時被多個線程訪問。互斥不只能實現同一應用程序的公共資源安全共享,還能實現不一樣應用程序的公共資源安全共享。

代碼示例:

#include "stdafx.h"
#include<windows.h>
#include<iostream>
using namespace std;
 
int number = 1; //定義全局變量
HANDLE hMutex;  //定義互斥對象句柄
 
unsigned long __stdcall ThreadProc1(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hMutex, INFINITE);
        cout << "thread 1 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseMutex(hMutex);
    }
 
    return 0;
}
 
unsigned long __stdcall ThreadProc2(void* lp)
{
    while (number < 100)
    {
        WaitForSingleObject(hMutex, INFINITE);
        cout << "thread 2 :"<<number << endl;
        ++number;
        _sleep(100);
        ReleaseMutex(hMutex);
    }
 
    return 0;
}
 
int main()
{
    hMutex = CreateMutex(NULL, false, "mutex");     //建立互斥對象
 
    CreateThread(NULL, 0, ThreadProc1, NULL, 0, NULL);
    CreateThread(NULL, 0, ThreadProc2, NULL, 0, NULL);
 
    Sleep(10*1000);
 
    system("pause");
    return 0;
}

結果同樣的。

 

三.多線程+IOCP實現服務端

(1)爲何使用IOCP模型。

socket是內核對象句柄,每次對socket執行操做,須要用戶對象到內核對象的轉換,執行完成返回結果,須要內核對象到用戶對象的轉換。
IOCP的中文名稱是完成端口,目前是Windows下最高效的網絡模型。特色:半異步,非阻塞。(我理解的徹底異步是回調式,不須要人工參與,可是IOCP的異步須要輪詢)。

        其餘模型的缺點:

1)select模型:最低效,每次檢索對長度有限制(默認是64個連接),能夠經過修改頭文件的方式修改上限,須要手動循環查詢是否有操做可執行,因此很低效;

2)WSAEvent,事件模型,缺點也是有上限,每次最多監聽64個事件,在收到事件通知後,去手動recv數據,效率比select高許多,由於操做是系統消息通知的,能夠實現異步;
3)完成例程模型,是對事件模型的改進,去掉了64個事件的上限


以上模型還有個缺點,就是每次有操做可執行時,須要手動去執行recv或者accept等操做,涉及到內核對象<->用戶對象的兩次切換(訂製獲取消息時一次,recv/accept操做一次),並且對於accept來講,每次手動調用,都會產生一個socket,當大量accept來到時,產生socket的過程會很是耗時。
        知道其餘模型的缺點,就知道了完成端口的優勢:1)沒有監聽上限;2)對於accept來講,socket是提早創建準備好的,收到鏈接時直接返回以前傳入的socket;3)只涉及到一次內核對象<->用戶對象切換(訂製消息時一次),由於在訂製消息的時候,已經把數據緩存地址給了內核對象,內核對象在收到數據、寫入緩存後,才切換回用戶對象,讓用戶拿走數據。總的來講,完成端口是proactor模型,其餘的是reactor模型。

 

 

(2)IOCP理解與應用。

 

扯遠點。首先傳統服務器的網絡IO流程以下:
接到一個客戶端鏈接->建立一個線程負責這個鏈接的IO操做->持續對新線程進行數據處理->所有數據處理完畢->終止線程。
可是這樣的設計代價是:

 

  • 1:每一個鏈接建立一個線程,將致使過多的線程。
    • 2:維護線程所消耗的堆棧內存過大。
      • 3:操做系統建立和銷燬線程過大。
        • 4:線程之間切換的上下文代價過大。

 

 

 


此時咱們能夠考慮使用線程池解決其中3和4的問題。這種傳統的服務器網絡結構稱之爲會話模型。
後來咱們爲防止大量線程的維護,建立了I/O模型,它被但願要求能夠:
1:容許一個線程在不一樣時刻給多個客戶端進行服務。
2:容許一個客戶端在不一樣時間被多個線程服務。

 


這樣作的話,咱們的線程則會大幅度減小,這就要求如下兩點:
1:客戶端狀態的分離,以前會話模式咱們能夠經過線程狀態得知客戶端狀態,但如今客戶端狀態要經過其餘方式獲取。
2:I/O請求的分離。一個線程再也不服務於一個客戶端會話,則要求客戶端對這個線程提交I/O處理請求。

 

那麼就產生了這樣一個模式,分爲三部分:

 

  • 1:會話狀態管理模塊。它負責接收到一個客戶端鏈接,就建立一個會話狀態。
    • 2:當會話狀態發生改變,例如斷掉鏈接,接收到網絡消息,就發送一個I/O請求給 I/O工做模塊進行處理。
      • 3:I/O工做模塊接收到一個I/O請求後,從線程池裏喚醒一個工做線程,讓該工做線程處理這個I/O請求,處理完畢後,該工做線程繼續掛起。

 

 

 

上面的作法,則將網絡鏈接 和I/O工做線程分離爲三個部分,相互通信僅依靠 I/O請求。此時可知有如下一些建議:

 

  • 1:在進行I/O請求處理的工做線程是被喚醒的工做線程,一個CPU對應一個的話,能夠最大化利用CPU。因此 活躍線程的個數 建議等於 硬件CPU個數。
    • 2:工做線程咱們開始建立了線程池,免除建立和銷燬線程的代價。由於線程是對I/O進行操做的,且一一對應,那麼當I/O所有並行時,工做線程必須知足I/O並行操做需求,因此 線程池內最大工做線程個數 建議大於或者等於 I/O並行個數。
      • 3:可是咱們可知CPU個數又限制了活躍的線程個數,那麼線程池過大意義很低,因此按常規建議 線程池大小 等於 CPU個數*2 左右爲佳。例如,8核服務器建議建立16個工做線程的線程池。 上面描述的依然是I/O模型並不是IOCP,那麼IOCP是什麼呢,全稱 IO完成端口。

 

 

 

它是一種WIN32的網絡I/O模型,既包括了網絡鏈接部分,也負責了部分的I/O操做功能,用於方便咱們控制有併發性的網絡I/O操做。它有以下特色:

 

  • 1:它是一個WIN32內核對象,因此沒法運行於Linux.
    • 2:它本身負責維護了工做線程池,同時也負責了I/O通道的內存池。
      • 3:它本身實現了線程的管理以及I/O請求通知,最小化的作到了線程的上下文切換。
        • 4:它本身實現了線程的優化調度,提升了CPU和內存緩衝的使用率。

 

使用IOCP的基本步驟很簡單:

 

    • 1:建立IOCP對象,由它負責管理多個Socket和I/O請求。CreateIoCompletionPort須要將IOCP對象和IOCP句柄綁定。
      • 2:建立一個工做線程池,以便Socket發送I/O請求給IOCP對象後,由這些工做線程進行I/O操做。注意,建立這些線程的時候,將這些線程綁定到IOCP上。
        • 3:建立一個監聽的socket。
          • 4:輪詢,當接收到了新的鏈接後,將socket和完成端口進行關聯而且投遞給IOCP一個I/O請求。注意:將Socket和IOCP進行關聯的函數和建立IOCP的函數同樣,都是CreateIoCompletionPort,不過注意傳參必然是不一樣的。
            • 5:由於是異步的,咱們能夠去作其餘,等待IOCP將I/O操做完成會回饋咱們一個消息,咱們再進行處理。
              • 其中須要知道的是:I/O請求被放在一個I/O請求隊列裏面,對,是隊列,LIFO機制。當一個設備處理完I/O請求後,將會將這個完成後的I/O請求丟回IOCP的I/O完成隊列。
                • 咱們應用程序則須要在GetQueuedCompletionStatus去詢問IOCP,該I/O請求是否完成。
                  • 其中有一些特殊的事情要說明一下,咱們有時有須要人工的去投遞一些I/O請求,則須要使用PostQueuedCompletionStatus函數向IOCP投遞一個I/O請求到它的請求隊列中。

 

 

 

 

(3)IOCP----API詳解

 

(1) 完成端口實現的API
CreateIoCompletionPort

 

HANDLE WINAPI CreateIoCompletionPort( _In_ HANDLE FileHandle, _In_opt_ HANDLE ExistingCompletionPort, _In_ ULONG_PTR CompletionKey, _In_ DWORD NumberOfConcurrentThreads ); 

 

 

 

返回值:若是函數成功,則返回值是I / O完成端口的句柄:若是函數失敗,則返回值爲NULL。
功能:兩個功能,建立完成端口句柄與將新的文件句柄(套接字)綁定到完成端口(咱們也能夠理解爲完成隊列,只是這個隊列由操做系統本身維護)

 

FileHandle:文件句柄或INVALID_HANDLE_VALUE。建立完成端口的時候,該值設置爲INVALID_HANDLE_VALUE,Ghost裏面時候的是一個臨時的socket句柄,不過咱們不用必定要這樣。
ExistingCompletionPort:現有I / O完成端口的句柄或NULL。若是此參數爲現有I / O完成端口,則該函數將其與FileHandle參數指定的句柄相關聯。若是成功則函數返回現有I / O完成端口的句柄。若是此參數爲NULL,則該函數將建立一個新的I / O完成端口,若是FileHandle參數有效,則將其與新的I / O完成端口相關聯。不然,不會發生文件句柄關聯。若是成功,該函數將把句柄返回給新的I / O完成端口。
CompletionKey:該值就是相似線程裏面傳遞的一個參數,咱們在GetQueuedCompletionStatus中第三個參數得到的就是這個值。
NumberOfConcurrentThreads:若是此參數爲NULL,則系統容許與系統中的處理器同樣多的併發運行的線程。若是ExistingCompletionPort參數不是NULL,則忽略此參數。

 

 

 

GetQueuedCompletionStatus

 

BOOL WINAPI GetQueuedCompletionStatus( _In_ HANDLE CompletionPort, _Out_ LPDWORD lpNumberOfBytes, _Out_ PULONG_PTR lpCompletionKey, _Out_ LPOVERLAPPED *lpOverlapped, _In_ DWORD dwMilliseconds ); 

 

 

 

返回值:成功返回TRUE,失敗返回FALSE,若是設置了超時時間,超時返回FALSE
功能:從完成端口中獲取已經完成的消息

 

CompletionPort:完成端口的句柄。
lpNumberOfBytes:該變量接收已完成的I / O操做期間傳輸的字節數。
lpCompletionKey:該變量及時咱們 CreateIoCompletionPort中傳遞的第三個參數
lpOverlapped:接收完成的I / O操做啓動時指定的OVERLAPPED結構的地址。咱們能夠經過CONTAINING_RECORD這個宏獲取以該重疊結構爲首地址的結構體信息,也就是該重疊結構爲何必須放在結構體的首地址的緣由。
dwMilliseconds:超時時間(毫秒),若是爲INFINITE則一直等待直到有消息到來。

 

備註:   CreateIoCompletionPort 提供這個功能:I/O系統能夠被用來向列隊的I/O完成端口發送I/O完成通知包。當 你執行一個已經關聯一個完成端口的文件I/O操做,I/O系統將會在這個I/O操做完成的時候向I/O完成端口發送一個完成通知包,I/O完成端口將以先 進先出的方式放置這個I/O完成通知包,並使用GetQueuedCompletionStatus 接收I/O完成通知包。  

 雖然容許任何數量的 線程來調用 GetQueuedCompletionStatus 等待一個I/O完成端口,但每一個線程只能同時間內關聯一個I/O完成端口,且此端口是線程最後檢查的那個端口。  

 當一個包被放入隊列中,系統首先會 檢查有多少個關聯此端口的線程在運行,若是運行的線程的數量少於NumberOfConcurrentThreads的值,那麼容許其中的一個等 待線程去處理包。當一個運行的線程完成處理,將再次調用GetQueuedCompletionStatus ,此時系統容許另外一個等待線程去處理包。  

 系 統也容許一個等待的線程處理包若是運行的線程進入任何形式的等待狀態,當這個線程從等待狀態進入運行狀態,可能會有一個很短的時期活動線程的數量會超過 NumberOfConcurrentThreads 的值,此時,系統會經過不容許任何新的活動線程快速的減小線程個數,直到活動線程少於NumberOfConcurrentThreads 的值。

 

 

PostQueuedCompletionStatus

 

BOOL WINAPI PostQueuedCompletionStatus( _In_ HANDLE CompletionPort, _In_ DWORD dwNumberOfBytesTransferred, _In_ ULONG_PTR dwCompletionKey, _In_opt_ LPOVERLAPPED lpOverlapped ); 

 

 

 

返回值:成功,返回非零,失敗返回零。使用GetLasrError獲取最後的錯誤碼
功能:手動向完成端口投遞一個異步消息。就相似咱們Win32中的PostMessage

 

CompletionPort:完成端口的句柄。
dwNumberOfBytesTransferred:經過GetQueuedCompletionStatus函數的lpNumberOfBytesTransferred參數返回的值。
dwCompletionKey:經過GetQueuedCompletionStatus函數的lpCompletionKey參數返回的值。
lpOverlapped:經過GetQueuedCompletionStatus函數的lpOverlapped參數返回的值。

 

能夠看到上面後三個參數均可以傳遞給GetQueuedCompletionStatus,這樣—來。—個工做者線程收到傳遞過來的三個GetQueuedCompletionStatus函數參數後,即可根據由這三個參數的某一個設置的特殊值,決定什麼時候應該退出。例如,可用dwCompletionPort參數傳遞0值,而—個工做者線程會將其解釋成停止指令。一旦全部工做者線程都已關閉,即可使用CloseHandle函數,關閉完成端口。最終安全退出程序。
  PostQueuedCompletionStatus函數提供了一種方式來與線程池中的全部線程進行通訊。如,當用戶終止服務應用程序時,咱們想要全部線程都徹底利索地退出。可是若是各線程還在等待完成端口而又沒有已完成的I/O 請求,那麼它們將沒法被喚醒。
  經過爲線程池中的每一個線程都調用一次PostQueuedCompletionStatus,咱們能夠將它們都喚醒。每一個線程會對GetQueuedCompletionStatus的返回值進行檢查,若是發現應用程序正在終止,那麼它們就能夠進行清理工做並正常地退出。

 

 

CONTAINING_RECORD

 

PCHAR CONTAINING_RECORD( [in] PCHAR Address, [in] TYPE Type, [in] PCHAR Field ); 

 

 

 

功能:返回給定結構類型的結構實例的基地址和包含結構中字段的地址。
返回值:返回包含Field的結構的基地址。
Address:咱們經過GetQueuedCompletionStatus獲取的重疊結構
Type:以重疊結構爲首地址的結構體
Field:Type結構體的重疊結構變量

 

(2)相關其餘函數
AcceptEx

 

BOOL AcceptEx( _In_ SOCKET sListenSocket, _In_ SOCKET sAcceptSocket, _In_ PVOID lpOutputBuffer, _In_ DWORD dwReceiveDataLength, _In_ DWORD dwLocalAddressLength, _In_ DWORD dwRemoteAddressLength, _Out_ LPDWORD lpdwBytesReceived, _In_ LPOVERLAPPED lpOverlapped ); 

 

 

 

返回值:成功返回TRUE,失敗返回FALSE
功能:投遞異步的接收操做,相似於實現了一個網絡內存池,這個池中存放的是已經創造好的套接字(因爲要進行異步操做,因此該套接字也要使用WSASocket建立),當有用戶鏈接的時候,操做系統會直接從這個網絡內存池中拿出一個來給鏈接的客戶端,這個過程咱們少去了鏈接時才創造套接字的過程(建立一個套接字的過程內部是很複雜的),這也是這個函數優異的地方。

 

該函數的參數很明確,只是有些其他的話還須要提醒,AcceptEx該函數還須要經過函數指針得到,由於該函數不是windows自身的API。具體的獲取過程也只是循序漸進,MSDN有詳細的例子,示例代碼中也有詳細的過程,筆者就不贅述了。

 

 

AcceptEx函數

         使用Accept(或WSAAccept)接受鏈接,當併發鏈接數超過大概30000(這取決於系統資源)的時候,容易出現WSAENOBUFS(10055)錯誤。這種錯誤主要是由於系統不能及時爲新鏈接進來的客戶端分配socket資源。所以咱們應該找到一種的使用以前可以分配socket資源的方法。AcceptEx 就是咱們尋找的答案,它的主要優點就是在使用socket資源以前就會分分配好資源,它的其餘方面的特色就比較麻煩使人費解了。(參見MSDN庫。)

 

 

(4)實現代碼

 

服務端代碼:

 

#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdlib.h>
#include <process.h>
#include <winsock2.h>
#include <windows.h>
 
#pragma comment(lib,"ws2_32.lib");//加載ws2_32.dll
 
#define BUF_SIZE 100
#define READ    3
#define    WRITE    5
 
typedef struct    // socket info
{
    SOCKET hClntSock;
    SOCKADDR_IN clntAdr;
} PER_HANDLE_DATA, *LPPER_HANDLE_DATA;
 
typedef struct    // buffer info
{
    OVERLAPPED overlapped;
    WSABUF wsaBuf;
    char buffer[BUF_SIZE];
    int rwMode;    // READ or WRITE 讀寫模式
} PER_IO_DATA, *LPPER_IO_DATA;
 
unsigned int  WINAPI EchoThreadMain(LPVOID CompletionPortIO);
void ErrorHandling(char *message);
SOCKET ALLCLIENT[100];
int clientcount = 0;
HANDLE hMutex;//互斥量
 
int main(int argc, char* argv[])
{
 
    hMutex = CreateMutex(NULL, FALSE, NULL);//建立互斥量
 
    WSADATA    wsaData;
    HANDLE hComPort;
    SYSTEM_INFO sysInfo;
    LPPER_IO_DATA ioInfo;
    LPPER_HANDLE_DATA handleInfo;
 
    SOCKET hServSock;
    SOCKADDR_IN servAdr;
    int  i;
    DWORD recvBytes = 0,flags = 0;
 
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
        ErrorHandling("WSAStartup() error!");
 
    hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//建立CP對象
    GetSystemInfo(&sysInfo);//獲取當前系統的信息
 
    for (i = 0; i < sysInfo.dwNumberOfProcessors; i++)
        _beginthreadex(NULL, 0, EchoThreadMain, (LPVOID)hComPort, 0, NULL);//建立=CPU個數的線程數
 
    hServSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);//不是非阻塞套接字,可是重疊IO套接字。
    memset(&servAdr, 0, sizeof(servAdr));
    servAdr.sin_family = AF_INET;
    servAdr.sin_addr.s_addr = htonl(INADDR_ANY);
    servAdr.sin_port = htons(1234);
 
    bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr));
    listen(hServSock, 5);
 
    while (1)
    {
        SOCKET hClntSock;
        SOCKADDR_IN clntAdr;
        int addrLen = sizeof(clntAdr);
 
        hClntSock = accept(hServSock, (SOCKADDR*)&clntAdr, &addrLen);
 
        handleInfo = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));//和重疊IO同樣
        handleInfo->hClntSock = hClntSock;//存儲客戶端套接字
 
        WaitForSingleObject(hMutex, INFINITE);//線程同步
 
        ALLCLIENT[clientcount++] = hClntSock;//存入套接字隊列
 
        ReleaseMutex(hMutex);
 
        memcpy(&(handleInfo->clntAdr), &clntAdr, addrLen);
 
        CreateIoCompletionPort((HANDLE)hClntSock, hComPort, (DWORD)handleInfo, 0);//鏈接套接字和CP對象
                                                                                //已完成信息將寫入CP對象
        ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//存儲接收到的信息
        memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
        ioInfo->wsaBuf.len = BUF_SIZE;
        ioInfo->wsaBuf.buf = ioInfo->buffer;//和重疊IO同樣
        ioInfo->rwMode = READ;//讀寫模式
 
        WSARecv(handleInfo->hClntSock, &(ioInfo->wsaBuf),//非阻塞模式
            1, &recvBytes, &flags, &(ioInfo->overlapped), NULL);
    }
    CloseHandle(hMutex);//銷燬互斥量
    return 0;
}
 
unsigned int WINAPI EchoThreadMain(LPVOID pComPort)//線程的執行
{
    HANDLE hComPort = (HANDLE)pComPort;
    SOCKET sock;
    DWORD bytesTrans;
    LPPER_HANDLE_DATA handleInfo;
    LPPER_IO_DATA ioInfo;
    DWORD flags = 0;
 
    while (1)//大循環
    {
        GetQueuedCompletionStatus(hComPort, &bytesTrans,//確認「已完成」的I/O!!
            (LPDWORD)&handleInfo, (LPOVERLAPPED*)&ioInfo, INFINITE);//INFINITE使用時,程序將阻塞,直到已完成的I/O信息寫入CP對象
        sock = handleInfo->hClntSock;//客戶端套接字
 
        if (ioInfo->rwMode == READ)//讀寫模式(此時緩衝區有數據)
        {
            puts("message received!");
            if (bytesTrans == 0)    // 鏈接結束
            {
                WaitForSingleObject(hMutex, INFINITE);//線程同步
 
                closesocket(sock);
                int i = 0;
                while (ALLCLIENT[i] == sock){ i++; }
                ALLCLIENT[i] = 0;//斷開置0
 
                ReleaseMutex(hMutex);
 
                free(handleInfo); free(ioInfo);
                continue;
            }
            int i = 0;
 
            for (; i < clientcount;i++)
            {
                if (ALLCLIENT[i] != 0)//判斷是否爲已鏈接的套接字
                {
                    if (ALLCLIENT[i] != sock)
                    {
                        LPPER_IO_DATA newioInfo;
                        newioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//動態分配內存
                        memset(&(newioInfo->overlapped), 0, sizeof(OVERLAPPED));
                        strcpy(newioInfo->buffer, ioInfo->buffer);//從新構建新的內存,防止屢次釋放free
                        newioInfo->wsaBuf.buf = newioInfo->buffer;
                        newioInfo->wsaBuf.len = bytesTrans;
                        newioInfo->rwMode = WRITE;
 
                        WSASend(ALLCLIENT[i], &(newioInfo->wsaBuf),//回聲
                            1, NULL, 0, &(newioInfo->overlapped), NULL);
                    }
                    else
                    {
                        memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
                        ioInfo->wsaBuf.len = bytesTrans;
                        ioInfo->rwMode = WRITE;
                        WSASend(ALLCLIENT[i], &(ioInfo->wsaBuf),//回聲
                            1, NULL, 0, &(ioInfo->overlapped), NULL);
                    }
                }
            }
            ioInfo = (LPPER_IO_DATA)malloc(sizeof(PER_IO_DATA));//動態分配內存
            memset(&(ioInfo->overlapped), 0, sizeof(OVERLAPPED));
            ioInfo->wsaBuf.len = BUF_SIZE;
            ioInfo->wsaBuf.buf = ioInfo->buffer;
            ioInfo->rwMode = READ;
            WSARecv(sock, &(ioInfo->wsaBuf),//再非阻塞式接收
                1, NULL, &flags, &(ioInfo->overlapped), NULL);
        }
        else
        {
            puts("message sent!");
            free(ioInfo);
        }
    }
    return 0;
}
 
void ErrorHandling(char *message)
{
    fputs(message, stderr);
    fputc('\n', stderr);
    exit(1);
}

 

 

客戶端:

#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <windows.h>
#include <process.h> 
#define BUF_SIZE 1000
#define NAME_SIZE 20
 
#pragma comment(lib, "ws2_32.lib")  //加載 ws2_32.dll  
 
unsigned WINAPI SendMsg(void * arg);//發送信息函數
unsigned WINAPI RecvMsg(void * arg);//接受信息函數
void ErrorHandling(char * msg);//錯誤返回函數
 
int haveread = 0;
char NAME[50];//[名字]
char ANAME[50];
char msg[BUF_SIZE];//信息
 
int main(int argc, char *argv[])
{
 
    printf("請輸入網名:");
    scanf("%s", NAME);
    WSADATA wsaData;
    SOCKET hSock;
    SOCKADDR_IN servAdr;
    HANDLE hSndThread, hRcvThread;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
        ErrorHandling("WSAStartup() error!");
 
    hSock = socket(PF_INET, SOCK_STREAM, 0);
    memset(&servAdr, 0, sizeof(servAdr));
    servAdr.sin_family = AF_INET;
    servAdr.sin_addr.s_addr = inet_addr("127.0.0.1");
    servAdr.sin_port = htons(1234);
 
    if (connect(hSock, (SOCKADDR*)&servAdr, sizeof(servAdr)) == SOCKET_ERROR)
        ErrorHandling("connect() error");
 
    int resultsend;
    puts("Welcome to joining our chatting room!\n");
    sprintf(ANAME, "[%s]", NAME);
 
    hSndThread =
        (HANDLE)_beginthreadex(NULL, 0, SendMsg, (void*)&hSock, 0, NULL);//寫線程
    hRcvThread =
        (HANDLE)_beginthreadex(NULL, 0, RecvMsg, (void*)&hSock, 0, NULL);//讀線程
 
    WaitForSingleObject(hSndThread, INFINITE);//等待線程結束
    WaitForSingleObject(hRcvThread, INFINITE);
    closesocket(hSock);
    WSACleanup();
    system("pause");
    return 0;
}
 
unsigned WINAPI SendMsg(void * arg)   // send thread main
{
    SOCKET sock = *((SOCKET*)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    char padd[2];
    fgets(padd, 2, stdin);//多餘的'\n'
    printf("\n send message:");
    while (1)
    {
        {
            fgets(msg, BUF_SIZE, stdin);
            if (!strcmp(msg, "q\n") || !strcmp(msg, "Q\n"))
            {
                closesocket(sock);
                exit(0);
            }
            sprintf(name_msg, "[%s] %s", NAME, msg);
            char numofmsg = strlen(name_msg) + '0';
            char newmsg[100]; newmsg[0] = numofmsg; newmsg[1] = 0;//第一個字符表示消息的長度
            strcat(newmsg, name_msg);
            int result = send(sock, newmsg, strlen(newmsg), 0);
            if (result == -1)return -1;//發送錯誤
        }
    }
    return NULL;
}
 
unsigned WINAPI RecvMsg(void * arg)  // read thread main
{
    SOCKET sock = *((SOCKET*)arg);
    char name_msg[NAME_SIZE + BUF_SIZE];
    int str_len = 0;
    while (1)
    {
        {
            char lyfstr[1000] = { 0 };
            int totalnum = 0;
            str_len = recv(sock, name_msg, 1, 0);//讀取第一個字符!獲取消息的長度
            if (str_len == -1)//讀取錯誤
            {
                printf("return -1\n");
                return -1;
            }
            if (str_len == 0)//讀取結束
            {
                printf("return 0\n");
                return 0;//讀取結束
            }
            totalnum = name_msg[0] - '0';
            int count = 0;
 
            do
            {
                str_len = recv(sock, name_msg, 1, 0);
 
                name_msg[str_len] = 0;
 
                if (str_len == -1)//讀取錯誤
                {
                    printf("return -1\n");
                    return -1;
                }
                if (str_len == 0)
                {
                    printf("return 0\n");
                    return 0;//讀取結束
                }
                strcat(lyfstr, name_msg);
                count = str_len + count;
 
            } while (count < totalnum);
 
            lyfstr[count] = '\0';
            printf("\n");
            strcat(lyfstr, "\n");
            fputs(lyfstr, stdout);
            printf(" send message:");
            fflush(stdout);
            memset(name_msg, 0, sizeof(char));
        }
    }
    return NULL;
}
 
void ErrorHandling(char * msg)
{
    fputs(msg, stderr);
    fputc('\n', stderr);
    exit(1);
}

 

 最後說一句啦。本網絡編程入門系列博客是連載學習的,有興趣的能夠看我博客其餘篇。。。。c++ 網絡編程課設入門超詳細教程 ---目錄

 

參考博客:https://blog.csdn.net/kaida1234/article/details/79465713

參考博客:http://www.runoob.com/cplusplus/cpp-multithreading.html

參考博客:https://blog.csdn.net/u010223072/article/details/49335867

參考博客:https://blog.csdn.net/wxf2012301351/article/details/73504281

參考書籍:《TCP/IP網絡編程 ---尹聖雨》

 

 

如有興趣交流分享技術,可關注本人公衆號,裏面會不按期的分享各類編程教程,和共享源碼,諸如研究分享關於c/c++,python,前端,後端,opencv,halcon,opengl,機器學習深度學習之類有關於基礎編程,圖像處理和機器視覺開發的知識

相關文章
相關標籤/搜索