Linux - posix 進程間通訊

接下來則編寫程序來建立一個posix消息隊列:shell

這是爲何呢?其實在man幫助中有說明:編程

因此,修改一下Makefile文件,加上這個連接選項:緩存

posix消息隊列建立成功了,下面來查看一下:bash

這是爲何呢?緣由是因爲ipcs只能查看System v建立的消息隊列,而poix建立的消息隊列不能經過這個方式來查看,那到底存放在哪裏呢?仍是回到強大的man幫助手冊來找尋答案:服務器

因此接下來建立一個掛載點:數據結構

接下來將消息隊列虛擬文件系統掛載到該目錄下:多線程

因此來看一下目前咱們所建立的消息隊列的狀態:併發

若是要想再次看到,則需從新掛載一次既可。socket

在建立posix消息隊列時,咱們指定了一個消息隊列名字,以下:函數

其中該名字是有必定的規則的,因此接下來講明一下

接下來要學習的一個函數比較簡單,關閉消息隊列:

【說明】:它的更準備的含義是刪除一個鏈接數,直接鏈接數減爲0的時候才真正將文件刪除。

下面來編寫程序來使用一下:

其中stuct mq_attr結構體爲:

下面來編譯程序來獲取一下消息隊列的屬性:

接下來利用這個函數來向消息隊列中發送消息:

另外也能夠用咱們本身編寫的查看屬性的程序來查看此時的屬性:

如今建立的消息隊列中已經發送了一個消息了,接下來將從消息隊列中來獲取已發送的消息:

這是爲何呢?

而最大長度大小能夠經過mq_attr函數來獲取,因此修改一個代碼:

下面從新再來發送多個消息,再接收:

沒有消息了,接收則會被阻塞,從運行效果中能夠看出。

從函數字面意思來看是一個通知,當消息隊列從沒消息到有消息會獲得通知,這個是System V跟Posix消息隊列的一個很是明顯的區別,就是SystemV消息隊列是無法獲得這個通知的,而Posix消息隊列是能夠的,也就是目前消息隊列是空的,當某個進程或線程往消息隊列發送一條消息時,這時消息隊列則會給出通知,只要進程註冊了該通知事件,而註冊通知就能夠經過mq_notify函數完成,具體以下:

其中用到的結構體:

能夠查看一下man幫助:

這裏主要關注信號的方式,由於線程目前尚未學到:

下面編寫程序來演示一下通知的效果:

接下來註冊一個消息隊列的通知事件:

當收到通知時,這時來處理一下信號處理程序:

另外,爲了看到通知效果,這裏須要不讓進程退出,因此死循環一下:

從中發現,當空的消息隊列裏面新發送了一條消息,則立馬就收到通知了,那若是繼續再發送呢?

這是爲何呢?其實這個通知是有必定規則的,以下:

要想讓每次發送都收到通知,則上面的第三點則是解決方案,下面來修改一下程序:

從中能夠發現,此次確實是每次發送消息都能被通知到了,那規則中提到,從新註冊必須放到接收消息以前,不能放在以後:

這是爲何呢?這時由於當發送一個消息時,被接收了以後,消息隊列裏面就爲空了,這時再次註冊就沒法接收到通知了,因此得放在接收消息以前再次註冊。

最後來講明一下在上面提到過須要解釋的:

正好能夠利用這個通知程序來講明,以下:

下面編寫程序來建立一個共享內存:

那posix的共享內存存放在哪裏呢?上節中學的posix的消息隊列是在虛擬文件當中建立一個消息隊列,須要咱們手工將它掛載到某個目錄下才能看到,一樣的,posix共享內存也是須要將其掛載,只不過這個掛載操做是由系統完成的,而不用咱們人工去操做了,已經掛載到了/dev/shm下了,以下:

接下來要介紹的函數爲修改共享內存的大小:

【說明】:實際上ftruncate函數也能修改文件的大小。

下面修改程序來使用一下它:

其中struct stat的結構體爲:

修改程序以下:

如今咱們已經建立了一個共享內存對象,那如何用它呢?則須要用到下面這個函數才行:

下面來使用一下,映射成功以後,先往內存中寫入數據,而後再從內存中來讀取:

接下來作一個容錯處理:

實際上映射失敗有專門的宏定義,從man幫助中能夠得知:

因此,用它來代替-1程序會更加可讀:

這是爲何呢?仍是從man幫助中來尋找答案:

因此問題緣由找到了,則修改一下打開方式既可:

有沒有成功寫入,則須要編寫一個讀取程序來驗證一下:

實際上能夠用過shell命令直接查看共享內存的內容:

【注意】:建立失敗這時會返回錯誤碼,而一般函數建立失敗都會返回-1,而後錯誤碼會保存在errno當中。

在處理線程建立失敗檢查時,下面來看一下檢查錯誤的一些說明:

因此下面來處理一下線程建立失敗的錯誤:

並且每一個線程都有本身的一個errono,避免多線程時有衝突。

接下來作這樣的一個操做,就是主線程打印A字符,而後新建立的線程打印B字符,

【注意】:新建立的線程不叫作子線程,由於並無父子關係,可是能夠把初始的線程叫主線程,以下:

這是因爲主線程已經結束了,而新建立的線程尚未被調度到,因此就沒有打印出B,因此解決此問題的辦法可能讓主線程小睡一會:

可見新建立的線程被調度到了,實際上主線程跟新建立的線程是交替運行的,下面修改下程序來講明下:

從中能夠發現每次運行的結果都不同,這個取決於系統是如何調度線程的。

另外有這樣的一個問題,就是可能新建立的線程尚未執行完畢,主線程就已經執行完畢了,也就是主線程須要睡眠去等待新線程執行完,下面屢次運行一下,看可否看到這種現象:

其中在主線程中睡眠是一種解決方案,可是比較笨,有沒有一個函數可以等待新建立的線程結束呢?其實是有的,就好像進程同樣,有waitpid來等待子進程的退出:

確實是達到了等待新建立線程退出的目的,下面再來學習一個函數:

固然線程的退出也能夠是執行完了再退出,以下:

其中線程結束包含兩種狀況:

①、自殺:調用pthread_exit();在線程入口函數中調用return。

②、他殺:調用pthread_cancel()。

若是在新建立的線程中調用此方法,若是主線程沒有調用pthread_join的狀況下,也能避免僵線程。

下面用線程的方式來改造一下以前用進程的方式實現的回射客戶/服務器程序,來進一步熟悉線程的使用:

客戶端echocli.c:

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)


void echo_cli(int sock)
{
    char sendbuf[1024] = {0};
        char recvbuf[1024] ={0};
        while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
        {
                write(sock, sendbuf, strlen(sendbuf));
                read(sock, recvbuf, sizeof(recvbuf));

                fputs(recvbuf, stdout);
                memset(sendbuf, 0, sizeof(sendbuf));
                memset(recvbuf, 0, sizeof(recvbuf));
        }

        close(sock);
}

int main(void)
{
    int sock;
    if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
        ERR_EXIT("socket");

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(5188);
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
        ERR_EXIT("connect");

    echo_cli(sock);

    return 0;
}
複製代碼

服務端echosrv.c:

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

void echo_srv(int conn)
{
    char recvbuf[1024];
        while (1)
        {
                memset(recvbuf, 0, sizeof(recvbuf));
                int ret = read(conn, recvbuf, sizeof(recvbuf));
        if (ret == 0)
        {
            printf("client close\n");
            break;
        }
        else if (ret == -1)
            ERR_EXIT("read");
                fputs(recvbuf, stdout);
                write(conn, recvbuf, ret);
        }
}int main(void)
{
    int listenfd;
    if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
        ERR_EXIT("socket");

    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(5188);
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    int on = 1;
    if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
        ERR_EXIT("setsockopt");

    if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
        ERR_EXIT("bind");
    if (listen(listenfd, SOMAXCONN) < 0)
        ERR_EXIT("listen");

    struct sockaddr_in peeraddr;
    socklen_t peerlen = sizeof(peeraddr);
    int conn;

    while (1)
    {
        if ((conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen)) < 0)
            ERR_EXIT("accept");

        printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));

        pid = fork();
        if (pid == -1)
            ERR_EXIT("fork");

        if (pid == 0)
        {//子進程
            close(listenfd);//不須要處理監聽
            echo_srv(conn);
            exit(EXIT_SUCCESS);
        } else {
            close(conn);//父進程不須要處理鏈接    
        }
    }
    
    return 0;
}
複製代碼

接下來要來進行服務端改造:

【注意】:因爲是單進程,因此就不必像建立進程的方式要關閉conn了,編程也簡單了許多。

從中能夠發現程序正常運轉,並且當客戶端退出時,相應的線程也退出了,這是爲何呢?

另外當線程退出了以後,其實該線程是屬於一個僵線程的狀態,由於在主線程中並無調用pthread_join()來等待新建立線程的退出,因此得避免僵線程的出現,修改代碼以下:

關於這個程序還有一個細節須要探討一下,以下:

那我能夠這樣寫麼?

此時線程入口函數也得發生改變:

看似一切正常,固然確定是有問題的,否則也不會換一種寫法來進行說明了,有什麼潛在風險呢?

這就是典型的Race Condition(也叫作資源競爭)問題。因此說conn只能值傳遞,而不能是傳遞指針,仍是將代碼還原,下面來討論另一個細節問題:

那如何解決呢?能夠採用動態申請內存的方式:

進程跟線程之間的對比圖

pthread_att_t屬性變量是須要進行初始化纔可以用的,必定初始化了屬性變量,它就包含了線程的多種屬性的值,那到底有哪些屬性了,下面一一來介紹:

其中第二個參數的指定值能夠經過man幫助來了解到:

下面用程序來實驗一下:

【注意】:在設置棧大小時,通常第二個參數設置爲0表示用系統定義的棧的大小,若是指定咱們本身設定的棧的大小可能會致使一些移植性的問題,因此通常狀況下棧的大小不會去設置。

首先須要瞭解線程調度競爭範圍:

那默認線程是什麼競爭範圍呢?用程序來查看:

這意味着新建立的線程跟調者用線程是不是同樣的調度策略,若是設置成繼承的則擁有同樣的調度策略:

其中調到了線程模型,這裏介紹一下,其實線程模型有三種:

其中須要說明一下,int pthread_setconcurrency(int new_level)設置併發級別,並不意味着線程的併發數是new_level,僅僅只是設置了一個併發級別,而且只是給內核一個提示而已,並不是真正的提供new_level個核心線程來映射用戶線程:

上面的這些概念仍是有些生澀,下面來用一個實例程序來進一步理解,在寫程序以前,須要用到特定數據的一些函數:

找一個空位來建立特定數據:

刪除特定數據:

給特定數據設定值及獲取特定數據裏面的值:

而後再建立兩個線程出來,來使用特定數據:

接下來編寫線程處理函數:

從結果來看:

下面再來介紹一對函數,以下:

它表明init_routine函數只在第一個線程進入的時候被執行一次,下面來修改一下程序:

那若是但願只有第一個線程進來時建立,而其它線程進來再也不建立,那這個函數就派上用場了,修改程序以下:

跟posix消息隊列,共享內存的打開,關閉,刪除操做同樣,不過,上面的函數是對有名信號量進行操做,經過man幫助能夠得知:

有名信號量相對的那就是無名信號量,對於它相關的函數以下:

一樣能夠查看man幫助:

【思考】:是否是無名信號量就沒法用於不一樣進程間的多個線程間進行通訊呢?實際上不是這樣的:

而對於信號量的P、V操做,能夠用如下兩個函數,既能用於有名,也能用於無名信號量:

初始化互斥鎖:

鎖定操做:

解鎖操做:

鎖屏互斥鎖:

【說明】:以上四個函數也是應用於無名的,也能夠用於不一樣進程的不一樣線程間進行通訊。

接下來就用信號量與互斥鎖來解決生產者消費者的問題:

下面利用posix信號量與互斥鎖來模擬生產者消費者問題:

因爲生產者與消費者能夠有多個,因此這兩個的個數能夠定義成一個宏,便於隨意更改:

接下來要定義一些信號量和互斥鎖變量:

以上是一些全局數據的初始化,接下來則開始真正代碼的編寫,首先得初始化信號量和互斥鎖:

接下來建立若干個線程:

接下來來編寫生產者與消費者的入口函數的實現:

先來實現生產產品的代碼:

在正式生產以前,先打印出倉庫當前的狀態,也就是緩衝區裏:

一樣的,在消費以前,也打印一下當前倉庫消費的狀態:

打印狀態以後,則開始生產產品:

一樣的消費者也相似:

下面則經過調整生產者與消費者的個數,再配合睡眠來查看一下運行結果:

狀況一:生產產品比較快,消費產品比較慢,因此常常有產品滿的狀況,也就是生產者會出現等待。

從結果中來以發現:

狀況二:生產產品比較慢,可是消費得比較快,因此常常出現產品爲空的狀況,也就是消費者會不斷出現等待。

從中能夠發現:

也就是說若是對某個臨界區施加了共享鎖,意味着還能夠對其施加共享鎖;而若是對臨界區施加了共享鎖或排它鎖,則不容許其它線程對它施加排它鎖。

#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <semaphore.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

#define CONSUMERS_COUNT 1
#define PRODUCERS_COUNT 1
#define BUFFSIZE 10

int g_buffer[BUFFSIZE];

unsigned short in = 0;
unsigned short out = 0;
unsigned short produce_id = 0;
unsigned short consume_id = 0;

sem_t g_sem_full;
sem_t g_sem_empty;
pthread_mutex_t g_mutex;

pthread_t g_thread[CONSUMERS_COUNT+PRODUCERS_COUNT];

void* consume(void *arg)
{
    int num = (int)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not empty\n", num);
        sem_wait(&g_sem_empty);
        pthread_mutex_lock(&g_mutex);
        //消費產品
        for (i=0; i<BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == out)
                printf("\t<--consume");

            printf("\n");
        }

        consume_id = g_buffer[out];
        printf("%d begin consume product %d\n", num, consume_id);
        g_buffer[out] = -1;
        out = (out + 1) % BUFFSIZE;
        printf("%d end consume product %d\n", num, consume_id);

        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_full);

        sleep(1);
    }
    return NULL;
}

void* produce(void *arg)
{
    int num = (int)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not full\n", num);
        sem_wait(&g_sem_full);
        pthread_mutex_lock(&g_mutex);
        //生產產品的代碼
        for (i=0; i<BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == in)
                printf("\t<--produce");

            printf("\n");
        }
        
        printf("%d begin produce product %d\n", num, produce_id);
        g_buffer[in] = produce_id;
        in = (in + 1) % BUFFSIZE;
        printf("%d end produce product %d\n", num, produce_id++);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_empty);

        sleep(5);
    }
    return NULL;
}

int main(void)
{
    int i;
    for (i=0; i<BUFFSIZE; i++)
        g_buffer[i] = -1;

    sem_init(&g_sem_full, 0, BUFFSIZE);
    sem_init(&g_sem_empty, 0, 0);

    pthread_mutex_init(&g_mutex, NULL);

    for (i=0; i<CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void*)i);

    for (i=0; i<PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT+i], NULL, produce, (void*)i);
    
    for (i=0; i<CONSUMERS_COUNT+PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);

    sem_destroy(&g_sem_full);
    sem_destroy(&g_sem_empty);
    pthread_mutex_destroy(&g_mutex);

    return 0;
}
複製代碼

下面用一個圖來進一步描述條件變量的做用:

爲何呢?

這實際上能夠解決生產者與消費者問題,並且對於緩衝區是無界的是一種比較理解的解決方案,只有有產品時才通知消費者開始消費產品,生產者不關心緩存區是否滿,後面會用條件變量與互斥鎖來解決生產者與消費者問題。

下面則根據上面的使用規範來解決生產者與消費者問題:

【說明】:這裏並無用到緩衝區,而是隻要發現條件不知足則等待,直接條件知足才消費,因此實現了一個無界的緩衝區,另外nready來簡單模擬產品。

另外爲了首次讓消費者進行等待,在建立消費者線程以後小睡一會:

下面也來分幾種狀況來查看消費者與生產者之間的關係:

①、消費得比較快,生產得比較慢

②、生產速度比較快,消費得比較慢:

爲啥以後沒有等待線程呢?這是因爲消費者的個數不如生產者線程的個數,消費速度不夠快,結合代碼來解釋:

因此,當消費者比生產者少時,等待的機率就會少不少。

以上就是利用條件變量與互斥鎖來解決生產者與消費者問題,下面來理解一些細節,也是理解代碼很關鍵的地方:

它主要是作了下面三件事:

一、對g_mutex進行解鎖。

爲何要先進行解鎖呢?

二、等待條件,直到有線程向它發起通知。

三、從新對g_mutex進行加鎖操做。

這三者構成了一個pthread_cond_wait原語,條件變量的使用最難的地方就是這個函數隱藏動做的理解,需細細體會下。

在上面留了一個問題,就是:

能夠從man幫助中尋找到答案:

上次中已經用互斥鎖與條件變量來改造了生產者與消費者問題,此次利用它來實現一個線程池,增強對條件變量及互斥鎖的認識,下面開始:

關於什麼是線程池,這裏就很少說了,應該基本都在實際中用到過,下面關於線程池實現有幾個點須要說明一下:

線程池,顧名思義就是擁有若干個線程,對於線程的個數是有嚴格的要求的,並不是越多越好,太多了會增長系統的開銷,太少了又會下降併發量。

執行時間較長的任務是不太適合放在線程池中進行處理,好比說:線程的執行時間跟進程的生命週期是一致的,那麼這個任務的執行就不必放到線程池中進行,直接用普通的線程既可。

那線程池當中的線程個數究竟存放多少個比較合適呢?實際上這跟任務類型有關係:

①、計算密集型任務:通常這個任務是佔用CPU的,它不多被外界的事件打斷,這時線程個數 = CPU個數,若是線程個數>CPU個數,因爲CPU的個數是必定的,那麼可以併發的數目也是必定的,因此會用少許的CPU個數來調度多個線程,這確定會涉及到線程與線程之間的切換開銷,於是會下降效率。

②、I/O密集型任務:這種任務在運行時,可能會被I/O中斷,也就是說這個線程會掛起,這時線程個數 > CPU個數,

那接下來先了解一下線程池實現中,須要用到的結構體:

下面則開始實現,首先在頭文件中定義上面的數據結構:

其中用到了條件變量,這裏對條件變量進行了簡單的封裝,因此先來看下是如何封裝的:

condition.c:

#include "condition.h"

//初使化條件變量,可想而知是對互斥鎖和條件變量進行初始化
int condition_init(condition_t *cond)
{
    int status;
    if ((status = pthread_mutex_init(&cond->pmutex, NULL)))
        return status;

    if ((status = pthread_cond_init(&cond->pcond, NULL)))
        return status;

    return 0;
}

//對互斥鎖進行鎖定
int condition_lock(condition_t *cond)
{
    return pthread_mutex_lock(&cond->pmutex);
}

//對互斥鎖進行解鎖
int condition_unlock(condition_t *cond)
{
    return pthread_mutex_unlock(&cond->pmutex);
}

//在條件變量上等待條件
int condition_wait(condition_t *cond)
{
    return pthread_cond_wait(&cond->pcond, &cond->pmutex);
}

//具備超時功能的等待功能
int condition_timedwait(condition_t *cond, const struct timespec *abstime)
{
    return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
}

//向等待線程發起一個通知
int condition_signal(condition_t *cond)
{
    return pthread_cond_signal(&cond->pcond);
}

//向等待線程發起廣播
int condition_broadcast(condition_t* cond)
{
    return pthread_cond_broadcast(&cond->pcond);
}

//銷燬條件變量
int condition_destroy(condition_t* cond)
{
    int status;
    if ((status = pthread_mutex_destroy(&cond->pmutex)))
        return status;

    if ((status = pthread_cond_destroy(&cond->pcond)))
        return status;

    return 0;
}
複製代碼

接着來實現一下線程池:

在正式實現這些函數以前,其實能夠先從使用者的角度來看,如何使用這些線程池,以下:

其實這是典型的「測試驅動開發」,先編寫好測試代碼,而後再來從使用的角度去具體實現,下面則開始具體實現線程池相應的方法:

接下來實現往線程池中添加任務:

其添加過程是從尾部進行添加的,其實就是單鏈表的應用。

這裏須要注意一個問題,就是在使用條件變量以前是須要對進行互斥的,由於隊列資源是生產者與消費者均可以訪問的,因此須要互斥:

接下來來處理線程的執行入口函數,線程應該是等待任務而且處理任務,也就是它是一個消費者線程:

下面來編譯運行一下,在運行以後,須要在main函數中作一下sleep:

並且是通過15秒以後,則進程退出了,可是有個問題,就是當任務執行完了,應該線程也能動態減小,目前當任務執行完了以後,全部線程都還在,也就是須要看到這樣的輸出:

可是目前看不到這樣的狀態,而是等到進程退出來線程才銷燬,因此須要對代碼進行改進,這時就須要用到等待超時的一個函數:

也就是若是線程等待超時了,則表明沒有任務了,則該線程就能夠銷燬了,因此將condition_wait須要換成condition_timedwait函數:

查看一相man幫助:

【說明】:獲取當前時間能夠用函數clock_gettime:

下面再來作超時處理:

接下來就剩最後一個銷燬方法沒有實現了,而main中的sleep則能夠去掉了:

其中看到有等待條件變量,那是誰來通知該條件變量呢,固然是在任務執行時,於時須要修改任務執行線程裏面的代碼:

下面再來編譯運行一下,在運行以前,能夠將以前的休眠代碼去掉了:

可見當線程任務都執行完了,全部的線程也銷燬了

相關文章
相關標籤/搜索