State-Thread(如下簡稱st),是一個由C語言編寫的小巧、簡潔卻高效的開源協程庫。這個庫基於單線程運做、不強制佔用用戶線程,給予了開發者最大程度的輕量級和較低的侵入性。本篇文章中,網易雲信音視頻研發大神將爲你們簡要分析State-Thread,歡迎你們積極留言,和咱們共同討論。
在開始這個話題以前,咱們先來聊一聊協程。
什麼是協程?
協程是一種程序組件。一般咱們把協程理解爲是一種程序本身實現調度、用於提升運行效率、下降開發複雜度的東西。提升運行效率很好理解,由於在程序層本身完成了部分的調度,下降了對系統調度的依賴,減小了大量的中斷和換頁操做。而下降了開發複雜度,則是指對於開發者而言,可使用同步的方式去進行代碼開發(不須要考慮異步模型的諸多回調),也不須要考慮多線程模型的線程調度和諸多的臨界資源問題。
不少語言都擁有協程,例如python或者golang。而對於c/c++而言,一般實現協程的常見方式,一般是依賴於glibc提供的setjump&longjump或者基於彙編語言,固然還有基於語義實現(protothread)。linux上使用協程庫的方式,一般也會分爲替換函數和更爲暴力的替換so來實現。固然而各類方式有各自的優劣。而st選用的彙編語言實現setjump&longjump和要求用戶調用st_打頭的函數來嵌入程序。因此st具有了跨平臺的能力,以及讓開發者們更開心的「與容許調用者自行選擇切換時機」的能力。
st到底是如何實現了這一切?
首先咱們先看看st的總體工做流程:python
在宏觀的來看,ST的結構主要分紅:
vp_schedule。主要是負責了一個調度的能力。有點相似於linux內核當中的schedule()函數。每次當這個函數被調用的時候,都會完成一次線程的切換。
各類Queue。用於保存各類狀態下等待被調度協程(st_thread)
Timer。用於記錄各類超時和sleep。
poll。用於監聽各類io事件,會根據系統能力不一樣而進行切換(kqueue、epoll、poll、select)。
st_thread。用於保存各類協程的信息。
其中比較重要的是schedule模塊和thread模塊二者。這二者實現了一個完整的協程切換和調度。屬於st的核心。而schedule部分一般是開發者們最須要關心的部分。
接下來咱們會深刻到代碼層,看一下具體在這個過程裏作了些什麼。
一般對於st而言,全部暴露給用戶的除了init函數,就是一系列的st_xxx函數了。那麼先看看init函數。
int st_init(void)
{
_st_thread_t *thread;linux
if (_st_active_count) {
/ Already initialized /
return 0;
}c++
/ We can ignore return value here /
st_set_eventsys(ST_EVENTSYS_DEFAULT);golang
if (_st_io_init() < 0)
return -1;多線程
memset(&_st_this_vp, 0, sizeof(_st_vp_t));架構
ST_INIT_CLIST(&_ST_RUNQ);
ST_INIT_CLIST(&_ST_IOQ);
ST_INIT_CLIST(&_ST_ZOMBIEQ);框架
if ((*_st_eventsys->init)() < 0)
return -1;異步
_st_this_vp.pagesize = getpagesize();
_st_this_vp.last_clock = st_utime();socket
/*函數
*/
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,
NULL, 0, 0);
if (!_st_this_vp.idle_thread)
return -1;
_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;
_st_active_count--;
_ST_DEL_RUNQ(_st_this_vp.idle_thread);
/*
*/
thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) +
(ST_KEYS_MAX sizeof(void )));
if (!thread)
return -1;
thread->private_data = (void **) (thread + 1);
thread->state = _ST_ST_RUNNING;
thread->flags = _ST_FL_PRIMORDIAL;
_ST_SET_CURRENT_THREAD(thread);
_st_active_count++;
return 0;
}
這段函數一共作了3事情,建立了一個idle_thread, 初始化了_ST_RUNQ、_ST_IOQ、
_ST_ZOMBIEQ三個隊列,把當前調用者初始化成原始函數(一般st_init會在main裏面調用,因此這個原始的thread至關因而主線程)。idle_thread函數,其實就是整個IO和定時器相關的本體函數了。st會在每一次_ST_RUNQ運行完成後,調用idle_thread來獲取可讀寫的io和定時器。這個咱們後續再說。
那麼,st_xxx通常會分紅io類和延遲類(sleep)。二者入口實際上是同一個,只不過在io類的會多調用一層。咱們這裏選擇st_send爲表明。
int st_sendmsg(_st_netfd_t fd, const struct msghdr msg, int flags,
st_utime_t timeout)
{
int n;
while ((n = sendmsg(fd->osfd, msg, flags)) < 0) {
if (errno == EINTR)
continue;
if (!_IO_NOT_READY_ERROR)
return -1;
/ Wait until the socket becomes writable /
if (st_netfd_poll(fd, POLLOUT, timeout) < 0)
return -1;
}
return n;
}
本質上全部的st函數都是以異步接口+ st_netfd_poll來實現的。在st_netfd_poll之內,會去調用st_poll,而st_poll本質上會調用而且切換線程。
int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
struct pollfd pd;
int n;
pd.fd = fd->osfd;
pd.events = (short) how;
pd.revents = 0;
if ((n = st_poll(&pd, 1, timeout)) < 0)
return -1;
if (n == 0) {
/ Timed out /
errno = ETIME;
return -1;
}
if (pd.revents & POLLNVAL) {
errno = EBADF;
return -1;
}
return 0;
}
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds;
_st_pollq_t pq;
_st_thread_t *me = _ST_CURRENT_THREAD();
int n;
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
return -1;
pq.pds = pds;
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
_ST_ADD_IOQ(pq);
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
me->state = _ST_ST_IO_WAIT;
_ST_SWITCH_CONTEXT(me);
n = 0;
if (pq.on_ioq) {
/ If we timed out, the pollq might still be on the ioq. Remove it /
_ST_DEL_IOQ(pq);
(*_st_eventsys->pollset_del)(pds, npds);
} else {
/ Count the number of ready descriptors /
for (pd = pds; pd < epd; pd++) {
if (pd->revents)
n++;
}
}
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
return n;
}
那麼到此爲止,st_poll中就出現了咱們最關心的調度部分了。
當一個線程進行調度的時候通常都是poll_add(若是是io操做),add_queue, _ST_SWITCH_CONTEXT完成一次調度。根據不一樣的類型,會add到不一樣的queue。例如須要超時,則會add到IOQ和SLEEPQ。而_ST_SWITCH_CONTEXT,則是最關鍵的切換線程操做了。
_ST_SWITCH_CONTEXT實際上是一個宏,它的本質是調用了MD_SETJMP和_st_vp_schedule().
ST_BEGIN_MACRO \
ST_SWITCH_OUT_CB(_thread); \
if (!MD_SETJMP((_thread)->context)) { \
_st_vp_schedule(); \
} \
ST_DEBUG_ITERATE_THREADS(); \
ST_SWITCH_IN_CB(_thread); \
ST_END_MACRO
這個函數其實就是一個完成的線程切換了。在st裏線程的切換會使用MD_SETJMP->_st_vp_schedule->MD_LONGJMP。MD_SETJMP和MD_LONGJMP其實就是st使用匯編本身寫的setjmp和longjmp函數(glibc),效果也是幾乎等效的。(由於st自己會作平臺適配,因此咱們以x86-64的彙編爲例)
/*
*/
.file "md.S"
.text
/ _st_md_cxt_save(__jmp_buf env) /
.globl _st_md_cxt_save
.type _st_md_cxt_save, @function
.align 16
_st_md_cxt_save:
/*
*/
movq %rbx, (JB_RBX*8)(%rdi)
movq %rbp, (JB_RBP*8)(%rdi)
movq %r12, (JB_R12*8)(%rdi)
movq %r13, (JB_R13*8)(%rdi)
movq %r14, (JB_R14*8)(%rdi)
movq %r15, (JB_R15*8)(%rdi)
/ Save SP /
leaq 8(%rsp), %rdx
movq %rdx, (JB_RSP*8)(%rdi)
/ Save PC we are returning to /
movq (%rsp), %rax
movq %rax, (JB_PC*8)(%rdi)
xorq %rax, %rax
ret
.size _st_md_cxt_save, .-_st_md_cxt_save
//
/ _st_md_cxt_restore(__jmp_buf env, int val) /
.globl _st_md_cxt_restore
.type _st_md_cxt_restore, @function
.align 16
_st_md_cxt_restore:
/*
*/
movq (JB_RBX*8)(%rdi), %rbx
movq (JB_RBP*8)(%rdi), %rbp
movq (JB_R12*8)(%rdi), %r12
movq (JB_R13*8)(%rdi), %r13
movq (JB_R14*8)(%rdi), %r14
movq (JB_R15*8)(%rdi), %r15
/ Set return value /
test %esi, %esi
mov $01, %eax
cmove %eax, %esi
mov %esi, %eax
movq (JB_PC*8)(%rdi), %rdx
movq (JB_RSP*8)(%rdi), %rsp
/ Jump to saved PC /
jmpq *%rdx
.size _st_md_cxt_restore, .-_st_md_cxt_restore
//
MD_SETJMP的時候,會使用匯編把全部寄存器的信息保留下來,而MD_LONGJMP則會把全部的寄存器信息從新加載出來。二者配合使用的時候,能夠完成一次函數間的跳轉。
那麼咱們已經看到了MD_SETJMP的調用,MD_LONGJMP調用在哪兒呢?
讓咱們繼續看下去,在最一開始,咱們就說起過_st_vp_schedule()這個核心函數。
void _st_vp_schedule(void)
{
_st_thread_t *thread;
if (_ST_RUNQ.next != &_ST_RUNQ) {
/ Pull thread off of the run queue /
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
} else {
/ If there are no threads to run, switch to the idle thread /
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
/ Resume the thread /
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}
這個函數其實很是簡單,基本工做原理能夠認爲是執行如下幾步: 1.查看當前RUNQ是否有能夠調用的,若是有,則RUNQ pop一個thread。 2. 若是沒有,則運行idle_thread。 3. 調用_ST_RESTORE_CONTEXT。
那麼_ST_RESTORE_CONTEXT作了什麼呢?
ST_BEGIN_MACRO \
_ST_SET_CURRENT_THREAD(_thread); \
MD_LONGJMP((_thread)->context, 1); \
ST_END_MACRO
簡單來講,_ST_RESTORE_CONTEXT就是調用了咱們以前所沒有看到的MD_LONGJMP。
因此,咱們能夠簡單地認爲,在攜程須要schedule的時候,會先把自身當前的棧經過MD_SETJMP保存起來,當線程被schedule再次調度出來的時候,則會使用MD_SETJMP來還原棧,完成一次協程切換。
而後咱們來看看idle_thread作了什麼。
雖然這個協程名字叫作idle,可是其實作了不少的事情。
void _st_idle_thread_start(void arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0) {
/ Idle vp till I/O is ready or the smallest timeout expired /
_ST_VP_IDLE();
/ Check sleep queue for expired threads /
_st_vp_check_clock();
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me);
}
/ No more threads /
exit(0);
/ NOTREACHED /
return NULL;
}
總的來講,idle_thread作了兩件事情。1. _ST_VP_IDLE() 2. _st_vp_check_clock()。_st_vp_check_clock很好理解,就是檢查定時器是否超時,若是超時了,則設置超時標記以後,放回RUNQ。而_ST_VP_IDLE,其實就是查看io是否已經ready了。例如linux的話,則會調用epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,
_st_epoll_data->evtlist_size, timeout)去查看是否有可響應的io。timeout值會根據當前空閒狀況進行變化,一般來講會是一個極小的值。
那麼看到這裏,總體的線程調度已經所有走完了。(詳見前面最一開始的流程圖)整體流程總結來講基本上是func() -> st_xxxx() -> AddQ -> MD_SETJMP -> schedule() -> MD_LONG -> func()。
因此對於st而言,因此的調度,是基於用戶調用。那麼若是用戶一直不調用st_xxx()(例如計算密集性服務),st也就沒法進行協程切換,那麼其餘協程也就產生極大的阻塞了。這也是爲何st並不太合適計算密集型的緣由(其實單線程框架大多都不合適計算密集型)
想要閱讀更多技術乾貨文章,歡迎關注網易雲信博客。瞭解網易雲信,來自網易核心架構的通訊與視頻雲服務。