比物理線程都好用的C++20的協程,你會用嗎?

摘要:事件驅動(event driven)是一種常見的代碼模型,其一般會有一個主循環(mainloop)不斷的從隊列中接收事件,而後分發給相應的函數/模塊處理。常見使用事件驅動模型的軟件包括圖形用戶界面(GUI),嵌入式設備軟件,網絡服務端等。

本文分享自華爲雲社區《C++20的協程在事件驅動代碼中的應用》,原文做者:飛得樂 。php

嵌入式事件驅動代碼的難題

事件驅動(event driven)是一種常見的代碼模型,其一般會有一個主循環(mainloop)不斷的從隊列中接收事件,而後分發給相應的函數/模塊處理。常見使用事件驅動模型的軟件包括圖形用戶界面(GUI),嵌入式設備軟件,網絡服務端等。ios

本文以一個高度簡化的嵌入式處理模塊作爲事件驅動代碼的例子:假設該模塊須要處理用戶命令、外部消息、告警等各類事件,並在主循環中進行分發,那麼示例代碼以下:c++

#include <iostream>
#include <vector>

enum class EventType {
    COMMAND,
    MESSAGE,
    ALARM
};

// 僅用於模擬接收的事件序列
std::vector<EventType> g_events{EventType::MESSAGE, EventType::COMMAND, EventType::MESSAGE};

void ProcessCmd()
{
    std::cout << "Processing Command" << std::endl;
}

void ProcessMsg()
{
    std::cout << "Processing Message" << std::endl;
}

void ProcessAlm()
{
    std::cout << "Processing Alarm" << std::endl;
}

int main() 
{
    for (auto event : g_events) {
        switch (event) {
            case EventType::COMMAND:
                ProcessCmd();
                break;
            case EventType::MESSAGE:
                ProcessMsg();
                break;
            case EventType::ALARM:
                ProcessAlm();
                break;
        }
    }
    return 0;
}

這只是一個極簡的模型示例,真實的代碼要遠比它複雜得多,可能還會包含:從特定接口獲取事件,解析不一樣的事件類型,使用表驅動方法進行分發……不過這些和本文關係不大,可暫時先忽略。git

用順序圖表示這個模型,大致上是這樣:
image.png程序員

在實際項目中,經常碰到的一個問題是:有些事件的處理時間很長,好比某個命令可能須要批量的進行上千次硬件操做:github

void ProcessCmd()
{
    for (int i{0}; i < 1000; ++i) {
        // 操做硬件接口……
    }
}

這種事件處理函數會長時間的阻塞主循環,致使其餘事件一直排隊等待。若是全部事件對響應速度都沒有要求,那也不會形成問題。可是實際場景中常常會有些事件是須要及時響應的,好比某些告警事件出現後,須要很快的執行業務倒換,不然就會給用戶形成損失。這個時候,處理時間很長的事件就會產生問題。
image.png編程

有人會想到額外增長一個線程專用於處理高優先級事件,實踐中這確實是個經常使用方法。然而在嵌入式系統中,事件處理函數會讀寫不少公共數據結構,還會操做硬件接口,若是併發調用,極容易致使各種數據競爭和硬件操做衝突,並且這些問題經常很難定位和解決。那在多線程的基礎上加鎖呢?——設計哪些鎖,加在哪些地方,也是很是燒腦並且容易出錯的工做,若是互斥等待過多,還會影響性能,甚至出現死鎖等麻煩的問題。segmentfault

另外一種解決方案是:把處理時間很長的任務切割成不少個小任務,並從新加入到事件隊列中。這樣就不會長時間的阻塞主循環。這個方案避免了併發編程產生的各類頭疼問題,可是卻帶來另外一個難題:如何把一個大流程切割成不少獨立小流程?在編碼時,這須要程序員解析函數流程的全部上下文信息,設計數據結構單獨存儲,並創建關聯這些數據結構的特殊事件。這每每會帶來幾倍的額外代碼量和工做量。promise

這個問題幾乎在全部事件驅動型軟件中都會存在,但在嵌入式軟件中尤其突出。這是由於嵌入式環境下的CPU、線程等資源受限,而實時性要求高,併發編程受限。網絡

C++20語言給這個問題提供了一種新的解決方案:協程。

C++20的協程簡介

關於協程(coroutine)是什麼,在wikipedia[1]等資料中有很好的介紹,本文就不贅述了。在C++20中,協程的關鍵字只是語法糖:編譯器會將函數執行的上下文(包括局部變量等)打包成一個對象,並讓未執行完的函數先返回給調用者。以後,調用者使用這個對象,可讓函數從原來的「斷點」處繼續往下執行。

使用協程,編碼時就再也不須要費心費力的去把函數「切割」成多個小任務,只用按照習慣的流程寫函數內部代碼,並在容許暫時中斷執行的地方加上co_yield語句,編譯器就能夠將該函數處理爲可「分段執行」。

協程用起來的感受有點像線程切換,由於函數的棧幀(stack frame)被編譯器保存成了對象,能夠隨時恢復出來接着往下運行。可是實際執行時,協程其實仍是單線程順序運行的,並無物理線程切換,一切都只是編譯器的「魔法」。因此用協程能夠徹底避免多線程切換的性能開銷以及資源佔用,也不用擔憂數據競爭等問題。

惋惜的是,C++20標準只提供了協程基礎機制,並未提供真正實用的協程庫(在C++23中可能會改善)。目前要用協程寫實際業務的話,能夠藉助開源庫,好比著名的cppcoro[2]。然而對於本文所述的場景,cppcoro也沒有直接提供對應的工具(generator通過適當的包裝能夠解決這個問題,可是不太直觀),所以我本身寫了一個切割任務的協程工具類用於示例。

自定義的協程工具

下面是我寫的SegmentedTask工具類的代碼。這段代碼看起來至關複雜,可是它做爲可重用的工具存在,沒有必要讓程序員都理解它的內部實現,通常只要知道它怎麼用就好了。SegmentedTask的使用很容易:它只有3個對外接口:Resume、IsFinished和GetReturnValue,其功能可根據接口名字自解釋。

#include <optional>
#include <coroutine>

template<typename T>
class SegmentedTask {
public:
    struct promise_type {
        SegmentedTask<T> get_return_object() 
        {
            return SegmentedTask{Handle::from_promise(*this)};
        }

        static std::suspend_never initial_suspend() noexcept { return {}; }
        static std::suspend_always final_suspend() noexcept { return {}; }
        std::suspend_always yield_value(std::nullopt_t) noexcept { return {}; }

        std::suspend_never return_value(T value) noexcept
        {
            returnValue = value;
            return {};
        }

        static void unhandled_exception() { throw; }

        std::optional<T> returnValue;
    };
 
    using Handle = std::coroutine_handle<promise_type>;
 
    explicit SegmentedTask(const Handle coroutine) : coroutine{coroutine} {}
 
    ~SegmentedTask() 
    { 
        if (coroutine) {
            coroutine.destroy(); 
        }
    }
 
    SegmentedTask(const SegmentedTask&) = delete;
    SegmentedTask& operator=(const SegmentedTask&) = delete;
 
    SegmentedTask(SegmentedTask&& other) noexcept : coroutine(other.coroutine) { other.coroutine = {}; }

    SegmentedTask& operator=(SegmentedTask&& other) noexcept
    {
        if (this != &other) {
            if (coroutine) {
                coroutine.destroy();
            }
            coroutine = other.coroutine;
            other.coroutine = {};
        }
        return *this;
    }

    void Resume() const { coroutine.resume(); }
    bool IsFinished() const { return coroutine.promise().returnValue.has_value(); }
    T GetReturnValue() const { return coroutine.promise().returnValue.value(); }
 
private:
    Handle coroutine;
};

本身編寫協程的工具類不光須要深刻了解C++協程機制,並且很容易產生懸空引用等未定義行爲。所以強烈建議項目組統一使用編寫好的協程類。若是讀者想深刻學習協程工具的編寫方法,能夠參考Rainer Grimm的博客文章[3]。

接下來,咱們使用SegmentedTask來改造前面的事件處理代碼。當一個C++函數中使用了co_await、co_yield、co_return中的任何一個關鍵字時,這個函數就變成了協程,其返回值也會變成對應的協程工具類。在示例代碼中,須要內層函數提早返回時,使用的是co_yield。可是C++20的co_yield後必須跟隨一個表達式,這個表達式在示例場景下並不必,就用了std::nullopt讓其能編譯經過。實際業務環境下,co_yield能夠返回一個數字或者對象用於表示當前任務執行的進度,方便外層查詢。

協程不能使用普通return語句,必須使用co_return來返回值,並且其返回類型也不直接等同於co_return後面的表達式類型。

enum class EventType {
    COMMAND,
    MESSAGE,
    ALARM
};

std::vector<EventType> g_events{EventType::COMMAND, EventType::ALARM};
std::optional<SegmentedTask<int>> suspended;  // 沒有執行完的任務保存在這裏

SegmentedTask<int> ProcessCmd()
{
    for (int i{0}; i < 10; ++i) {
        std::cout << "Processing step " << i << std::endl;
        co_yield std::nullopt;
    }
    co_return 0;
}

void ProcessMsg()
{
    std::cout << "Processing Message" << std::endl;
}

void ProcessAlm()
{
    std::cout << "Processing Alarm" << std::endl;
}

int main()
{
    for (auto event : g_events) {
        switch (event) {
            case EventType::COMMAND:
                suspended = ProcessCmd();
                break;
            case EventType::MESSAGE:
                ProcessMsg();
                break;
            case EventType::ALARM:
                ProcessAlm();
                break;
        }
    }
    while (suspended.has_value() && !suspended->IsFinished()) {
        suspended->Resume();
    }
    if (suspended.has_value()) {
        std::cout << "Final return: " << suspended->GetReturnValue() << endl;
    }
    return 0;
}

出於讓示例簡單的目的,事件隊列中只放入了一個COMMAND和一個ALARM,COMMAND是能夠分段執行的協程,執行完第一段後,主循環會優先執行隊列中剩下的事件,最後再來繼續執行COMMAND餘下的部分。實際場景下,可根據須要靈活選擇各類調度策略,好比專門用一個隊列存放全部未執行完的分段任務,並在空閒時依次執行。

本文中的代碼使用gcc 10.3版本編譯運行,編譯時須要同時加上-std=c++20和-fcoroutines兩個參數才能支持協程。代碼運行結果以下:

Processing step 0
Processing Alarm
Processing step 1
Processing step 2
Processing step 3
Processing step 4
Processing step 5
Processing step 6
Processing step 7
Processing step 8
Processing step 9
Final return: 0

能夠看到ProcessCmd函數(協程)的for循環語句並無一次執行完,在中間插入了ProcessAlm的執行。若是分析運行線程還會發現,整個過程當中並無物理線程的切換,全部代碼都是在同一個線程上順序執行的。

使用了協程的順序圖變成了這樣:
image.png

事件處理函數的執行時間長再也不是問題,由於能夠中途「插入」其餘的函數運行,以後再返回斷點繼續向下運行。

總結

一個較廣泛的認識誤區是:使用多線程能夠提高軟件性能。但事實上,只要CPU沒有空跑,那麼當物理線程數超過了CPU核數,就再也不會提高性能,相反還會因爲線程的切換開銷而下降性能。大多數開發實踐中,併發編程的主要好處並不是爲了提高性能,而是爲了編碼的方便,由於現實中的場景模型不少都是併發的,容易直接對應成多線程代碼。

協程能夠像多線程那樣方便直觀的編碼,可是同時又沒有物理線程的開銷,更沒有互斥、同步等併發編程中使人頭大的設計負擔,在嵌入式應用等不少場景下,經常是比物理線程更好的選擇。

相信隨着C++20的逐步普及,協程未來會獲得愈來愈普遍的使用。

尾註

[1] https://en.wikipedia.org/wiki...
[2] https://github.com/lewissbake...
[3] https://www.modernescpp.com/i...

點擊關注,第一時間瞭解華爲雲新鮮技術~

相關文章
相關標籤/搜索