srs使用的開源c語言網絡協程庫 state thread 源碼分析

state thread是一個開源的c語言網絡協程庫,它在用戶空間實現了協程調度
st最初是由網景(Netscape)公司的MSPRNetscape Portable Runtime library)項目中剝離出來,後由SGISilicon Graphic Inc)和Yahoo!公司(前者是主力)共同開發維護。
2001年發佈v1.0以來一直到2009v1.9穩定版後未再變更

State Threads:回調終結者(必讀)
https://blog.csdn.net/caoshangpa/article/details/79565411

st-1.9.tar.gz 是原版, http://state-threads.sourceforge.net/
state-threads-1.9.1.tar.gz srs修改版, https://github.com/ossrs/state-threads

st源碼編譯
tar zxvf st-1.9.tar.gz

cd st-1.9
make linux-debug          // make命令能夠查看支持的編譯選項
obj目錄有編譯生成的文件st.h, lib*.solib*.a

examples目錄有幾個例子lookupdnsproxyserver

須要的知識點
彙編語言(非必需)
線程的棧管理(非必需)
線程的調度和同步(必須)線程不一樣步的測試代碼thread.c
4 setjmp/longjmp的使用(必須)。測試代碼setjmp.c
5 epoll原理和使用(必須)。測試代碼epoll_server.c 和 epoll_client.c

測試代碼以及文檔下載地址
連接: https://pan.baidu.com/s/1kQz3S1YIt6zUwMKScrnHaQ
提取碼: pu9z


分析state_thread源碼的目的,是爲了正確的使用它
st中thread實際上是協程的概念
st_xxx分爲 io  延遲類

一些重要的數據結構
_st_vp_t _st_this_vp;     virtual processor 虛擬處理器
_st_thread_t *_st_this_thread;
_st_clist_t  run_q, io_q, zombie_q, thread_q 
_st_thread_t  *idle_thread, *sleep_q

代碼分析
st庫自帶的example業務邏輯較爲複雜,有興趣能夠看下。
爲了簡化問題,編寫了測試代碼st-1.9/examples/st_epoll.c,依據此代碼提出問題分析問題。
st_init()作了什麼?
_st_idle_thread_start()作了什麼
st_thread_create()作了什麼?
st_thread_exit()作了什麼?
st_usleep()作了什麼?
主業務邏輯(無限循環)協程是如何調度的?
監聽的文件描述符是如何調度的?
協程如何正常退出?
1 沒有設置終止條件變量(不能夠被join)的協程直接return便可退出; 
2 設置了終止條件變量(能夠被join)的協程退出時,先把本身加入到zombie_q中,而後通知等待的協程,等待的協程退出後,本身在退出。html

協程的join(鏈接)是什麼意思?
1 建立協程a的時候 st_thread_create(handle_cycle, NULL, 1, 0) 要設置爲1, 表示該協程能夠被join
2 協程b代碼裏要掉用st_thread_join(thread, retvalp),表示我要join到協程a上
3 join的意思是 協程a和協程b 有必定關聯行,在協程退出時,要先退出協程b 才能退出協程a
4 st中一個協程只能被另外一個協程join,不能被多個協程join
5 能夠被join的協程a,在沒有其餘協程join時,協程a沒法正常退出linux

st裏的mutex有什麼用?
一般狀況下st的多協程是不須要加鎖的,可是在有些狀況下須要鎖來保證原子操做,下面會詳細說明。
st_mutex_new(void); 建立鎖
st_mutex_destroy(st_mutex_t lock); 等待隊列必須爲空才能銷燬鎖
st_mutex_lock(st_mutex_t lock); 第一次掉用能得到鎖,之後掉用會加入鎖的等待隊列中(FIFO)
st_mutex_unlock(st_mutex_t lock); 釋放鎖並激活等待隊列的協程
st_mutex_trylock(st_mutex_t lock); 嘗試得到鎖不會加入到等待隊列git

st裏的cond有什麼用?
一般狀況下st的多協程是不須要條件變量的,可是有些狀況下須要條件變量來保證協程執行的前後順序,好比:協程a要先於協程b執行
st_cond_new(void); 建立條件變量
st_cond_destroy(st_cond_t cvar); 等待隊列必須爲空才能銷燬條件變量
st_cond_timedwait(st_cond_t cvar, st_utime_t timeout); 限時等待條件變量,會加入條件變量的等待隊列中(FIFO),並加入到sleep_q隊列中(可能先於FIFO的順序被調度到)
st_cond_wait(st_cond_t cvar); 阻塞等待條件變量,會加入條件變量的等待隊列中(FIFO)
st_cond_signal(st_cond_t cvar); 喚醒阻塞在條件變量上的一個協程
st_cond_broadcast(st_cond_t cvar); 喚醒阻塞在條件變量上的所有協程github

這個圖要配合測試代碼 st-1.9/examples/st_epoll.c網絡

st中與調度有關的函數
st的setjmp
#define _ST_SWITCH_CONTEXT(_thread)   \                協程切換的兩個宏函數之一,中止當前協程並運行其餘協程
  ST_BEGIN_MACRO                      \
  ST_SWITCH_OUT_CB(_thread);          \                       協程切走時調用的函數,通常無論用
  if (!MD_SETJMP((_thread)->context)) \                         彙編語言實現 應該跟setjmp()同樣 首次掉用返回0
  {                                   \
    _st_vp_schedule();                \                                    核心調度函數
  }                                   \
  ST_DEBUG_ITERATE_THREADS();         \
  ST_SWITCH_IN_CB(_thread);           \                          協程切回時調用的函數,通常無論用
  ST_END_MACRO

st的longjmp
#define _ST_RESTORE_CONTEXT(_thread) \               協程切換的兩個宏函數之一,恢復線程運行
  ST_BEGIN_MACRO                     \
  _ST_SET_CURRENT_THREAD(_thread);   \                設置全局變量 _st_this_thread = _thread
  MD_LONGJMP((_thread)->context, 1); \                       彙編語言實現 應該跟longjmp()同樣返回值永遠爲1
  ST_END_MACRO

MD_SETJMP的時候,會使用匯編把全部寄存器的信息保留下來,而MD_LONGJMP則會把全部的寄存器信息從新加載出來。二者配合使用的時候,能夠完成函數間的跳轉。

st的核心調度函數
void _st_vp_schedule(void)
{
  _st_thread_t *thread;
  printf("in _st_vp_schedule\n");
  printf("_st_active_count = %d\n", _st_active_count);
  if (_ST_RUNQ.next != &_ST_RUNQ)
  {
    printf("use runq\n");
    /* Pull thread off of the run queue */
    thread = _ST_THREAD_PTR(_ST_RUNQ.next);
    _ST_DEL_RUNQ(thread);
  }
  else
  {
    printf("use idle\n");
    /* If there are no threads to run, switch to the idle thread */
    thread = _st_this_vp.idle_thread;
  }
  ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
  /* Resume the thread */
  thread->state = _ST_ST_RUNNING;
  _ST_RESTORE_CONTEXT(thread);
}

st輔助調度函數
void *_st_idle_thread_start(void *arg)
{
  printf("i'm in _st_idle_thread_start()\n");
  _st_thread_t *me = _ST_CURRENT_THREAD();

  while (_st_active_count > 0)
  {
    /* Idle vp till I/O is ready or the smallest timeout expired */
    printf("call _st_epoll_dispatch()\n");
    _ST_VP_IDLE();                                                       處理io類事件

    /* Check sleep queue for expired threads */
    _st_vp_check_clock();                                               處理延時類事件
    me->state = _ST_ST_RUNNABLE;
    _ST_SWITCH_CONTEXT(me);                                  從這裏恢復運行,而後判斷_st_active_count的值
  }
/* No more threads */
  exit(0);                                                                        整個程序退出
  /* NOTREACHED */
  return NULL;
}

會觸發協程切換的函數有哪些?
sched.c:86: _ST_SWITCH_CONTEXT(me); 59 int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
sched.c:234: _ST_SWITCH_CONTEXT(me); 221 void *_st_idle_thread_start(void *arg)
sched.c:261: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sched.c:276: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sync.c:131: _ST_SWITCH_CONTEXT(me); 115 int st_usleep(st_utime_t usecs)
sync.c:198: _ST_SWITCH_CONTEXT(me); 180 int st_cond_timedwait(_st_cond_t *cvar, st_utime_t timeout)
sync.c:315: _ST_SWITCH_CONTEXT(me); 290 int st_mutex_lock(_st_mutex_t *lock)

sched.c:134: _ST_RESTORE_CONTEXT(thread); 115 void _st_vp_schedule(void)

st中的interrupt
顯示調用void st_thread_interrupt(_st_thread_t *thread)會對協程設置interrupt狀態,interrupt狀態會中斷協程的本次運行(多是個循環任務),是否致使協程退出,要看協程內部對interrupt返回值的處理。下面以st_usleep()函數爲例進行說明。
[ykMac:st-1.9]# grep -nr "_ST_FL_INTERRUPT" *
common.h:311:#define _ST_FL_INTERRUPT 0x08        interrupt的宏定義
sched.c:68:  if (me->flags & _ST_FL_INTERRUPT)         59 int st_poll() ,調用函數時,判斷是否設置interrupt
sched.c:70:    me->flags &= ~_ST_FL_INTERRUPT;       若是設置就退出,退出前對interrupt取反
sched.c:107:  if (me->flags & _ST_FL_INTERRUPT)       59 int st_poll(),變爲運行協程時,判斷是否設置interrupt
sched.c:109:    me->flags &= ~_ST_FL_INTERRUPT;     若是設置就退出,退出前對interrupt取反
sched.c:551:  thread->flags |= _ST_FL_INTERRUPT;     545 void st_thread_interrupt()中設置爲interrupt
sync.c:119:  if (me->flags & _ST_FL_INTERRUPT) {       115 int st_usleep(st_utime_t usecs),調用函數時
sync.c:120:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:133:  if (me->flags & _ST_FL_INTERRUPT) {       115 int st_usleep(st_utime_t usecs),變爲運行協程時
sync.c:134:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:185:  if (me->flags & _ST_FL_INTERRUPT) {       180 int st_cond_timedwait(),調用函數時
sync.c:186:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:208:  if (me->flags & _ST_FL_INTERRUPT) {       180 int st_cond_timedwait(),變爲運行協程時
sync.c:209:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:294:  if (me->flags & _ST_FL_INTERRUPT) {       290 int st_mutex_lock(),調用函數時
sync.c:295:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:319:  if ((me->flags & _ST_FL_INTERRUPT) && lock->owner != me) {       290 int st_mutex_lock(),變運行時
sync.c:320:    me->flags &= ~_ST_FL_INTERRUPT;

115 int st_usleep(st_utime_t usecs)
116 {
117   _st_thread_t *me = _ST_CURRENT_THREAD();
118
119   if (me->flags & _ST_FL_INTERRUPT) {
120     me->flags &= ~_ST_FL_INTERRUPT;     退出前對interrupt取反
121     errno = EINTR;
122     return -1;                                                若是不對errno或返回值作處理,循環仍是會繼續的
123   }
124
125   if (usecs != ST_UTIME_NO_TIMEOUT) {
126     me->state = _ST_ST_SLEEPING;
127     _ST_ADD_SLEEPQ(me, usecs);
128   } else
129     me->state = _ST_ST_SUSPENDED;
130
131   _ST_SWITCH_CONTEXT(me);
132
133   if (me->flags & _ST_FL_INTERRUPT) {
134     me->flags &= ~_ST_FL_INTERRUPT;
135     errno = EINTR;
136     return -1;
137   }
138
139   return 0;
140 }

st的優缺點
優勢:
1 用戶空間實現協程調度,下降了用戶空間和內核空間的切換,必定程度上提升了程序效率。
2 因爲是在單核上的單線程多協程,同一時間只會有一個協程在運行,因此對於全局變量也不須要作協程同步。
   共享資源釋放函數只需作到可重入就行,所謂的可重入就是釋放以前先判斷是否爲空值,釋放後要賦空值。
3 協程使用完,直接return便可,st會回收協程資源並作協程切換。
能夠經過向run_q鏈表頭部加入協程,來實現優先調度。
5 st支持多個操做系統,好比 AIX,CYGWIN,DARWIN,FREEBSD,HPUX,IRIX,LINUX,NETBSD,OPENBSD,SOLARIS
缺點:
全部I/O操做必須使用st提供的API,只有這樣協程才能被調度器管理。
2 全部協程裏不能使用sleep(),sleep()會形成整個線程sleep。
被調度到的協程不會限制運行時長,若是有協程cpu密集型或死循環,就會嚴重阻礙其餘協程運
4 單進程單線程,只能使用單核,想要經過多個cpu提升併發能力,只能開多個程序(進程),多進程通訊較麻煩。

補充知識點
線程爲何要同步?
線程由內核自動調度
同一個進程上的線程共享該進程的整個虛擬地址空間
同一個進程上的線程代碼區是共享的,即不一樣的線程能夠執行一樣的函數
因此在併發環境中,多個線程同時對同一個內存地址進行寫入,因爲CPU寄存器時間調度上的問題,寫入數據會被屢次的覆蓋,會形成共享數據損壞,因此就要使線程同步。

2 什麼狀況下須要線程同步?
線程同步指的是 不一樣時發生,就是線程要排隊
1 多核,單進程多線程,不一樣線程會對全局變量讀寫,這種狀況才須要對線程作同步控制
2 單核,單進程多線程,不一樣線程會對全局變量讀寫,這種狀況不須要對線程作同步控制
3 多核,單進程多線程,不一樣線程不對全局變量讀寫,這種狀況不須要對線程作同步控制
4 多核,單進程多線程,不一樣線程全局變量,這種狀況不須要對線程作同步控制

問題:對於第2條,這個應該是有點兒片面,有依賴有優先級搶佔也是要同步,除非像abcde這種幾個線程幹一摸同樣的事情,並且項目之間不依賴。
有依賴 能夠理解爲 生產消費關係,雖然是 單核 單進程 多線程 可是同一時間只能有一個線程在運行,也就是說 生產和消費不會同時發生,一樣多個生產也不會同時發生,因此不須要鎖。

線程是有優先級控制,可是無論怎麼控制,只要保證同一時間只能有一個線程在運行,就不須要鎖了。
問題:對於第2條,這個應該是有點兒片面,有原子操做且原子操做過程當中有線程切換,這種是須要鎖的。
好比,線程a 第一次讀取全局變量x並作處理,而後發生線程切換(線程由內和自動調度)後切回,而後第二次讀取全局變量x並作處理,咱們想確保兩次讀取
x值相同,可是發生了線程切換x值可能被改變。

如何 確保 第一次讀取並處理和第二次讀取並處理是原子操做呢? 使用st_mutex_t

3 accept()序列化
亦稱驚羣效應,亦亦稱Zeeg難題
https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/SerializingAccept.html
在屢次fork本身以後,每一個進程通常將會開始阻塞在 accept() 
每當socket上嘗試進行一個鏈接,阻塞在 accept() 上的每一個進程的 accept() 都會被喚醒。
只有其中一個進程可以真正接收到這個鏈接,而剩餘的進程將會得到一個無聊的 EAGAIN 這致使了大量的CPU週期浪費實際解決方法是把一個鎖放在 accept() 調用以前,來序列化它的使用

4 Internet Applications網絡程序架構
多進程架構 Multi-Process
    一個進程服務一個鏈接,要解決數據共享問題
單進程多線程架構 Multi-Threaded 
    一個線程服務一個鏈接,要解決數據同步問題
事件驅動的狀態機架構 Event-Driven State Machine 
    事件觸發回調函數(缺點是嵌套 用戶空間實現協程調度
實際上 EDSM架構 用很複雜的方式模擬了多線程
st提供的就是EDSM機制,它在用戶空間實現協程調度
https://blog.csdn.net/caoshangpa/article/details/53282330數據結構

相關文章
相關標籤/搜索