C/C++協程學習筆記丨C/C++實現協程及原理分析視頻

協程,又稱微線程,纖程。英文名Coroutine。linux

協程的概念很早就提出來了,但直到最近幾年纔在某些語言(如Lua)中獲得普遍應用。算法

子程序,或者稱爲函數,在全部語言中都是層級調用,好比A調用B,B在執行過程當中又調用了C,C執行完畢返回,B執行完畢返回,最後是A執行完畢。數組

因此子程序調用是經過棧實現的,一個線程就是執行一個子程序。網絡

子程序調用老是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不一樣。數據結構

協程看上去也是子程序,但執行過程當中,在子程序內部可中斷,而後轉而執行別的子程序,在適當的時候再返回來接着執行。多線程

注意,在一個子程序中中斷,去執行其餘子程序,不是函數調用,有點相似CPU的中斷。好比子程序A、B:def A():架構

print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'

假設由協程執行,在執行A的過程當中,能夠隨時中斷,去執行B,B也可能在執行過程當中中斷再去執行A,結果多是:socket

1
2
x
y
3
z

可是在A中是沒有調用B的,因此協程的調用比函數調用理解起來要難一些。ide

看起來A、B的執行有點像多線程,但協程的特色在因而一個線程執行,那和多線程比,協程有何優點?函數

最大的優點就是協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。

第二大優點就是不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。

由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。

Python對協程的支持還很是有限,用在generator中的yield能夠必定程度上實現協程。雖然支持不徹底,但已經能夠發揮至關大的威力了。

來看例子:

傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,經過鎖機制控制隊列和等待,但一不當心就可能死鎖。

若是改用協程,生產者生產消息後,直接經過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高:import time

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)

執行結果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

注意到consumer函數是一個generator(生成器),把一個consumer傳入produce後:

  1. 首先調用c.next()啓動生成器;
  2. 而後,一旦生產了東西,經過c.send(n)切換到consumer執行;
  3. consumer經過yield拿到消息,處理,又經過yield把結果傳回;
  4. produce拿到consumer處理的結果,繼續生產下一條消息;
  5. produce決定不生產了,經過c.close()關閉consumer,整個過程結束。

整個流程無鎖,由一個線程執行,produce和consumer協做完成任務,因此稱爲「協程」,而非線程的搶佔式多任務。

協程完整視頻連接以及文檔資料+qun720209036獲取

協程的實現與原理剖析訓練營

C/C++ 協程

首先須要聲明的是,這裏不打算花時間來介紹什麼是協程,以及協程和線程有什麼不一樣。若是對此有任何疑問,能夠自行 google。與 Python 不一樣,C/C++ 語言自己是不能自然支持協程的。現有的 C++ 協程庫均基於兩種方案:利用匯編代碼控制協程上下文的切換,以及利用操做系統提供的 API 來實現協程上下文切換。典型的例如:

  • libco,Boost.context:基於彙編代碼的上下文切換
  • phxrpc:基於 ucontext/Boost.context 的上下文切換
  • libmill:基於 setjump/longjump 的協程切換

通常而言,基於彙編的上下文切換要比採用系統調用的切換更加高效,這也是爲何 phxrpc 在使用 Boost.context 時要比使用 ucontext 性能更好的緣由。關於 phxrpc 和 libmill 具體的協程實現方式,之後有時間再詳細介紹。

libco 協程的建立和切換

在介紹 coroutine 的建立以前,咱們先來熟悉一下 libco 中用來表示一個 coroutine 的數據結構,即定義在 co_routine_inner.h 中的 stCoRoutine_t:

struct stCoRoutine_t
{
stCoRoutineEnv_t *env; // 協程運行環境
pfn_co_routine_t pfn; // 協程執行的邏輯函數
void *arg; // 函數參數
coctx_t ctx; // 保存協程的下文環境
...
char cEnableSysHook; // 是否運行系統 hook,即非侵入式邏輯
char cIsShareStack; // 是否在共享棧模式
void *pvEnv;
stStackMem_t* stack_mem; // 協程運行時的棧空間
char* stack_sp; // 用來保存協程運行時的棧空間
unsigned int save_size;
char* save_buffer;
};

咱們暫時只須要了解表示協程的最簡單的幾個參數,例如協程運行環境,協程的上下文環境,協程運行的函數以及運行時棧空間。後面的 stack_sp,save_size 和 save_buffer 與 libco 共享棧模式相關,有關共享棧的內容咱們後續再說

協程建立和運行

因爲多個協程運行於一個線程內部的,所以當建立線程中的第一個協程時,須要初始化該協程所在的環境 stCoRoutineEnv_t,這個環境是線程用來管理協程的,經過該環境,線程能夠得知當前一共建立了多少個協程,當前正在運行哪個協程,當前應當如何調度協程:

struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; // 記錄當前建立的協程
int iCallStackSize; // 記錄當前一共建立了多少個協程
stCoEpoll_t *pEpoll; // 該線程的協程調度器
// 在使用共享棧模式拷貝棧內存時記錄相應的 coroutine
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};

上述代碼代表 libco 容許一個線程內最多建立 128 個協程,其中 pCallStack[iCallStackSize-1] 也就是棧頂的協程表示當前正在運行的協程。當調用函數 co_create 時,首先檢查當前線程中的 coroutine env 結構是否建立。這裏 libco 對於每一個線程內的 stCoRoutineEnv_t 並無使用 thread-local 的方式(例如gcc 內置的 __thread,phxrpc採用這種方式)來管理,而是預先定義了一個大的數組,並經過對應的 PID 來獲取其協程環境。:

static stCoRoutineEnv_t* g_arrCoEnvPerThread[204800]
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return g_arrCoEnvPerThread[ GetPid() ];
}

初始化 stCoRoutineEnv_t 時主要完成如下幾步:

  1. 爲 stCoRoutineEnv_t 申請空間而且進行初始化,設置協程調度器 pEpoll。
  2. 建立一個空的 coroutine,初始化其上下文環境( 有關 coctx 在後文詳細介紹 ),將其加入到該線程的協程環境中進行管理,而且設置其爲 main coroutine。這個 main coroutine 用來運行該線程主邏輯。

當初始化完成協程環境以後,調用函數 co_create_env 來建立具體的協程,該函數初始化一個協程結構 stCoRoutine_t,設置該結構中的各項字段,例如運行的函數 pfn,運行時的棧地址等等。須要說明的就是,若是使用了非共享棧模式,則須要爲該協程單獨申請棧空間,不然從共享棧中申請空間。棧空間表示以下:

struct stStackMem_t
{
stCoRoutine_t* occupy_co; // 使用該棧的協程
int stack_size; // 棧大小
char* stack_bp; // 棧的指針,棧從高地址向低地址增加
char* stack_buffer; // 棧底
};

使用 co_create 建立完一個協程以後,將調用 co_resume 來將該協程激活運行:

void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
// 獲取當前正在運行的協程的結構
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
// 爲將要運行的 co 佈置上下文環境
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co; // 設置co爲運行的線程
co_swap( lpCurrRoutine, co );
}

函數 co_swap 的做用相似於 Unix 提供的函數 swapcontext:將當前正在運行的 coroutine 的上下文以及狀態保存到結構 lpCurrRoutine 中,而且將 co 設置成爲要運行的協程,從而實現協程的切換。co_swap 具體完成三項工做:

  1. 記錄當前協程 curr 的運行棧的棧頂指針,經過 char c; curr_stack_sp=&c 實現,當下次切換回 curr時,能夠從該棧頂指針指向的位置繼續,執行完 curr 後能夠順利釋放該棧。
  2. 處理共享棧相關的操做,而且調用函數 coctx_swap 來完成上下文環境的切換。注意執行完 coctx_swap以後,執行流程將跳到新的 coroutine 也就是 pending_co 中運行,後續的代碼須要等下次切換回 curr 時纔會執行。
  3. 當下次切換回 curr 時,處理共享棧相關的操做。

對應於 co_resume 函數,協程主動讓出執行權則調用 co_yield 函數。co_yield 函數調用了 co_yield_env,將當前協程與當前線程中記錄的其餘協程進行切換:

void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}

前面咱們已經提到過,pCallStack 棧頂所指向的即爲當前正在運行的協程所對應的結構,所以該函數將 curr 取出來,並將當前正運行的協程上下文保存到該結構上,並切換到協程 last 上執行。接下來咱們以 32-bit 的系統爲例來分析 libco 是如何實現協程運行環境的切換的。

協程上下文的建立和切換

libco 使用結構 struct coctx_t 來表示一個協程的上下文環境:

struct coctx_t
{

if defined(__i386__)

void *regs[ 8 ];

else

void *regs[ 14 ];

endif

size_t ss_size;
char *ss_sp;
};

能夠看到,在 i386 的架構下,須要保存 8 個寄存器信息,以及棧指針和棧大小,究竟這 8 個寄存器如何保存,又是如何使用,須要配合後續的 coctx_swap 來理解。咱們首先來回顧一下 Unix-like 系統的 stack frame layout,若是不能理解這個,那麼剩下的內容就沒必要看了。

結合上圖,咱們須要知道關鍵的幾點:

  1. 函數調用棧是調用者和被調用者共同負責佈置的。Caller 將其參數從右向左反向壓棧,再將調用後的返回地址壓棧,而後將執行流程交給 Callee。
  2. 典型的編譯器會將 Callee 函數彙編成爲以 push %ebp; move %ebp, %esp; sub $esp N; 這種形式開頭的彙編代碼。這幾句代碼主要目的是爲了方便 Callee 利用 ebp 來訪問調用者提供的參數以及自身的局部變量(以下圖)。
  3. 當調用過程完成清除了局部變量之後,會執行 pop %ebp; ret,這樣指令會跳轉到 RA 也就是返回地址上面執行。這一點也是實現協程切換的關鍵:咱們只須要將指定協程的函數指針地址保存到 RA 中,當調用完 coctx_swap 以後,會自動跳轉到該協程的函數起始地址開始運行

瞭解了這些,咱們就來看一下協程上下文環境的初始化函數 coctx_make:

int coctx_make( coctx_t ctx, coctx_pfn_t pfn, const void s, const void *s1 )
{
char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);
sp = (char*)((unsigned long)sp & -16L);
coctx_param_t param = (coctx_param_t)sp ;
param->s1 = s;
param->s2 = s1;
memset(ctx->regs, 0, sizeof(ctx->regs));
ctx->regs[ kESP ] = (char )(sp) - sizeof(void);
ctx->regs[ kEIP ] = (char*)pfn;
return 0;
}

這段代碼應該比較好理解,首先爲函數 coctx_pfn_t 預留 2 個參數的棧空間並對其到 16 字節,以後將實參設置到預留的棧上空間中。最後在 ctx 結構中填入相應的,其中記錄 reg[kEIP] 返回地址爲函數指針 pfn,記錄 reg[kESP] 爲得到的棧頂指針 sp 減去一個指針長度,這個減去的空間是爲返回地址 RA 預留的。當調用 coctx_swap 時,reg[kEIP] 會被放到返回地址 RA 的位置,待 coctx_swap 執行結束,天然會跳轉到函數 pfn 處執行。

coctx_swap(ctx1, ctx2) 在 coctx_swap.S 中實現。這裏能夠看到,該函數並無使用 push %ebp; move %ebp, %esp; sub $esp N; 開頭,所以棧空間分佈中不會出現 ebp 的位置。coctx_swap 函數主要分爲兩段,其首先將當前的上下文環境保存到 ctx1 結構中:

leal 4(%esp), %eax // eax = old_esp + 4
movl 4(%esp), %esp // 將 esp 的值設爲 &ctx1(即ctx1的地址)
leal 32(%esp), %esp // esp = (char*)&ctx1 + 32
pushl %eax // ctx1->regs[EAX] = %eax
pushl %ebp // ctx1->regs[EBP] = %ebp
pushl %esi // ctx1->regs[ESI] = %esi
pushl %edi // ctx1->regs[EDI] = %edi
pushl %edx // ctx1->regs[EDX] = %edx
pushl %ecx // ctx1->regs[ECX] = %ecx
pushl %ebx // ctx1->regs[EBX] = %ebx
pushl -4(%eax) // ctx1->regs[EIP] = RA, 注意:%eax-4=%old_esp

這裏須要注意指令 leal 和 movl 的區別。leal 將 eax 的值設置成爲 esp 的值加 4,而 movl 將 esp 的值設爲 esp+4 所指向的內存上的值,也就是參數 ctx1 的地址。以後該函數將 ctx2 中記錄的上下文恢復到 CPU 寄存器中,並跳轉到其函數地址處運行:

movl 4(%eax), %esp // 將 esp 的值設爲 &ctx2(即ctx2的地址)
popl %eax // %eax = ctx1->regs[EIP],也就是 &pfn
popl %ebx // %ebx = ctx1->regs[EBP]
popl %ecx // %ecx = ctx1->regs[ECX]
popl %edx // %edx = ctx1->regs[EDX]
popl %edi // %edi = ctx1->regs[EDI]
popl %esi // %esi = ctx1->regs[ESI]
popl %ebp // %ebp = ctx1->regs[EBP]
popl %esp // %esp = ctx1->regs[ESP],即(char )(sp) - sizeof(void)
pushl %eax // RA = %eax = &pfn,注意此時esp已經指向了新的esp
xorl %eax, %eax // reset eax
ret

上面的代碼看起來可能有些繞:

  1. 首先 line 1 將 esp 設置爲參數 ctx2 的地址,後續的 popl 操做均在 ctx2 的內存空間上執行。
  2. line 2-9 將 ctx2->regs[] 中的內容恢復到相應的寄存器中。還記得在前面 coctx_make 中設置了 regs[EIP] 和 regs[ESP] 嗎?這裏恰好就對應恢復了相應的值。
  3. 當執行完 line 9 以後,esp 已經指向了 ctx2 中新的棧頂指針,因爲在 coctx_make 中預留了一個指針長度的 RA 空間,line 10 恰好將新的函數指針 &pfn 設置到該 RA 上。
  4. 最後執行 ret 指令時,函數流程將跳到 pfn 處執行。這樣,整個協程上下文的切換就完成了。

如何使用 libco

咱們首先以 libco 提供的例子 example_echosvr.cpp 來介紹應用程序如何使用 libco 來編寫服務端程序。 在 example_echosvr.cpp 的 main 函數中,主要執行以下幾步:

  1. 建立 socket,監聽在本機的 1024 端口,並設置爲非阻塞;
  2. 主線程使用函數 readwrite_coroutine 建立多個讀寫協程,調用 co_resume 啓動協程運行直到其掛起。這裏咱們忽略掉無關的多進程 fork 的過程;
  3. 主線程繼續建立 socket 接收協程 accpet_co,一樣調用 co_resume 啓動協程直到其掛起;
  4. 主線程調用函數 co_eventloop 實現事件的監聽和協程的循環切換;

函數 readwrite_coroutine 在外層循環中將新建立的讀寫協程都加入到隊列 g_readwrite 中,此時這些讀寫協程都沒有具體與某個 socket 鏈接對應,能夠將隊列 g_readwrite 當作一個 coroutine pool。當加入到隊列中以後,調用函數 co_yield_ct 函數讓出 CPU,此時控制權回到主線程。

主線程中的函數 co_eventloop 監聽網絡事件,未來自於客戶端新進的鏈接交由協程 accept_co 處理,關於 co_eventloop 如何喚醒 accept_co 的細節咱們將在後續介紹。accept_co 調用函數 accept_routine 接收新鏈接,該函數的流程以下:

  1. 檢查隊列 g_readwrite 是否有空閒的讀寫 coroutine,若是沒有,調用函數 poll 將該協程加入到 Epoll 管理的定時器隊列中,也就是 sleep(1000) 的做用;
  2. 調用 co_accept 來接收新鏈接,若是接收鏈接失敗,那麼調用 co_poll 將服務端的 listen_fd 加入到 Epoll 中來觸發下一次鏈接事件;
  3. 對於成功的鏈接,從 g_readwrite 中取出一個讀寫協程來負責處理讀寫;

再次回到函數 readwrite_coroutine 中,該函數會調用 co_poll 將新創建的鏈接的 fd 加入到 Epoll 監聽中,並將控制流程返回到 main 協程;當有讀或者寫事件發生時,Epoll 會喚醒對應的 coroutine ,繼續執行 read 函數以及 write 函數。

上面的過程大體說明了控制流程是如何在不一樣的協程中切換,接下來咱們介紹具體的實現細節,即如何經過 Epoll 來管理協程,以及如何對系統函數進行改造以知足 libco 的調用。

經過 Epoll 管理和喚醒協程

Epoll 監聽 FD

上一章節中介紹了協程能夠經過函數 co_poll 來將 fd 交由 Epoll 管理,待 Epoll 的相應的事件觸發時,再切換回來執行 read 或者 write 操做,從而實現由 Epoll 管理協程的功能。co_poll 函數原型以下:

int co_poll(stCoEpoll_t *ctx, struct pollfd fds[],
nfds_t nfds, int timeout_ms)

stCoEpoll_t 是爲 libco 定製的 Epoll 相關數據結構,fds 是 pollfd 結構的文件句柄,nfds 爲 fds 數組的長度,最後一個參數表示定時器時間,也就是在 timeout 毫秒以後觸發處理這些文件句柄。這裏能夠看到,co_poll 可以同時將多個文件句柄同時加入到 Epoll 管理中。咱們先看 stCoEpoll_t 結構:

struct stCoEpoll_t
{
int iEpollFd; // Epoll 主 FD
static const int _EPOLL_SIZE = 1024 * 10; // Epoll 能夠監聽的句柄總數
struct stTimeout_t *pTimeout; // 時間輪定時器
struct stTimeoutItemLink_t *pstTimeoutList; // 已經超時的時間
struct stTimeoutItemLink_t *pstActiveList; // 活躍的事件
co_epoll_res *result; // Epoll 返回的事件結果
};

以 stTimeout_ 開頭的數據結構與 libco 的定時器管理有關,咱們在後面介紹。co_epoll_res 是對 Epoll 事件數據結構的封裝,也就是每次觸發 Epoll 事件時的返回結果,在 Unix 和 MaxOS 下,libco 將使用 Kqueue 替代 Epoll,所以這裏也保留了 kevent 數據結構。

struct co_epoll_res
{
int size;
struct epoll_event *events; // for linux epoll
struct kevent *eventlist; // for Unix or MacOs kqueue
};

co_poll 實際是對函數 co_poll_inner 的封裝。咱們將 co_epoll_inner 函數的結構分爲上下兩半段。在上半段中,調用 co_poll 的協程 CC 將其須要監聽的句柄數組 fds 都加入到 Epoll 管理中,並經過函數 co_yield_env 讓出 CPU;當 main 協程的事件循環 co_eventloop 中觸發了 CC 對應的監聽事件時,會恢復 CC的執行。此時,CC 將開始執行下半段,即將上半段添加的句柄 fds 從 epoll 中移除,清理殘留的數據結構,下面的流程圖簡要說明了控制流的轉移過程:

有了上面的基本概念,咱們來看具體的實現細節。co_poll 首先在內部將傳入的文件句柄數組 fds 轉化爲數據結構 stPoll_t,這一步主要是爲了方便後續處理。該結構記錄了 iEpollFd,ndfs,fds 數組,以及該協程須要執行的函數和參數。有兩點須要說明的是:

  1. 對於每個 fd,爲其申請一個 stPollItem_t 來管理對應 Epoll 事件以及記錄回調參數。libco 在此作了一個小的優化,對於長度小於 2 的 fds 數組,直接在棧上定義相應的 stPollItem_t 數組,不然從堆中申請內存。這也是一種比較常見的優化,畢竟從堆中申請內存比較耗時;
  2. 函數指針 OnPollProcessEvent 封裝了協程的切換過程。當傳入指定的 stPollItem_t 結構時,便可喚醒對應於該結構的 coroutine,將控制權交由其執行;

co_poll 的第二步,也是最關鍵的一步,就是將 fd 數組所有加入到 Epoll 中進行監聽。協程 CC 會將每個 epoll_event 的 data.ptr 域設置爲對應的 stPollItem_t 結構。這樣當事件觸發時,能夠直接從對應的 ptr中取出 stPollItem_t 結構,而後喚醒指定協程。

若是本次操做提供了 Timeout 參數,co_poll 還會將協程 CC 本次操做對應的 stPoll_t 加入到定時器隊列中。這代表在 Timeout 定時觸發以後,也會喚醒協程 CC 的執行。當整個上半段都完成後,co_poll 當即調用 co_yield_env 讓出 CPU,執行流程跳轉回到 main 協程中。

從上面的流程圖中也能夠看出,當執行流程再次跳回時,代表協程 CC 添加的讀寫等監聽事件已經觸發,便可以執行相應的讀寫操做了。此時 CC 首先將其在上半段中添加的監聽事件從 Epoll 中刪除,清理殘留的數據結構,而後調用讀寫邏輯。

定時器實現

協程 CC 在將一組 fds 加入 Epoll 的同時,還能爲其設置一個超時時間。在超時時間到期時,也會再次喚醒 CC 來執行。libco 使用 Timing-Wheel 來實現定時器。關於 Timing-Wheel 算法,能夠參考,其優點是 O(1) 的插入和刪除複雜度,缺點是隻有有限的長度,在某些場合下不能知足需求。

回過去看 stCoEpoll_t 結構,其中 pTimeout 表明時間輪,經過函數 AllocateTimeout 初始化爲一個固定大小(60 1000)的數組。根據 Timing-Wheel 的特性可知,libco 只支持最大 60s 的定時事件。而實際上,在添加定時器時,libco 要求定時時間不超過 40s。成員 pstTimeoutList 記錄在 co_eventloop 中發生超時的事件,而 pstActiveList 記錄當前活躍的事件,包括超時事件。這兩個結構都將在 co_eventloop 中進行處理。

下面咱們簡要分析一下加入定時器的實現:

int AddTimeout( stTimeout_t apTimeout, stTimeoutItem_t apItem,
unsigned long long allNow )
{
if( apTimeout->ullStart == 0 ) // 初始化時間輪的基準時間
{
apTimeout->ullStart = allNow;
apTimeout->llStartIdx = 0; // 當前時間輪指針指向數組0
}
// 1. 當前時間不可能小於時間輪的基準時間
// 2. 加入的定時器的超時時間不能小於當前時間
if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow )
{
return __LINE__;
}
int diff = apItem->ullExpireTime - apTimeout->ullStart;
if( diff >= apTimeout->iItemSize ) // 添加的事件不能超過期間輪的大小
{
return __LINE__;
}
// 插入到時間輪盤的指定位置
AddTail( apTimeout->pItems +
(apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );
return 0;
}

定時器的超時檢查在函數 co_eventloop 中執行。

EPOLL 事件循環

main 協程經過調用函數 co_eventloop 來監聽 Epoll 事件,並在相應的事件觸發時切換到指定的協程執行。有關 co_eventloop 與 應用協程的交互過程在上一節的流程圖中已經比較清楚了,下面咱們主要介紹一下 co_eventloop 函數的實現:

上文中也提到,經過 epoll_wait 返回的事件都保存在 stCoEpoll_t 結構的 co_epoll_res 中。所以 co_eventloop 首先爲 co_epoll_res 申請空間,以後經過一個無限循環來監聽全部 coroutine 添加的全部事件:

for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
...
}

對於每個觸發的事件,co_eventloop 首先經過指針域 data.ptr 取出保存的 stPollItem_t 結構,並將其添加到 pstActiveList 列表中;以後從定時器輪盤中取出全部已經超時的事件,也將其所有添加到 pstActiveList 中,pstActiveList 中的全部事件都做爲活躍事件處理。

對於每個活躍事件,co_eventloop 將經過調用對應的 pfnProcess 也就是上圖中的OnPollProcessEvent 函數來切換到該事件對應的 coroutine,將流程跳轉到該 coroutine 處執行。

最後 co_eventloop 在調用時也提供一個額外的參數來供調用者傳入一個函數指針 pfn。該函數將會在每次循環完成以後執行;當該函數返回 -1 時,將會終止整個事件循環。用戶能夠利用該函數來控制 main 協程的終止或者完成一些統計需求。

相關文章
相關標籤/搜索