ucore在前面的實驗中實現了進程/線程機制,並在lab6中實現了搶佔式的線程調度機制。基於中斷的搶佔式線程調度機制使得線程在執行的過程當中隨時可能被操做系統打斷,被阻塞掛起而令其它的線程得到CPU。多個線程併發的執行,大大提高了非cpu密集型應用程序的cpu吞吐量,使得計算機系統中寶貴的cpu硬件資源獲得了充分利用。html
操做系統提供的內核線程併發機制的優勢是明顯的,但同時也帶來了一些問題,其中首當其衝的即是線程安全問題。java
線程安全指的是在擁有共享數據的多條線程並行執行的程序中,線程安全的代碼會經過同步機制保證各個線程均可以正常且正確的執行,不會出現數據污染等意外狀況。node
舉一個經典的例子:在高級語言中對於某一共享整型變量i(假設i=5)進行的i++操做,在最終的機器代碼中會被分解爲幾個更細緻的機器指令:git
1. 從內存的對應地址中讀取出變量i的值(高級語言的變量在機器層面表現爲一個內存地址),寫入cpu的寄存器中(假設是edx)程序員
2. 對寄存器edx進行+1運算(運算後edx寄存器中的值爲5+1=6)github
3. 將edx的值寫入變量i對應的內存空間中(在高級語言層面看,寫入edx中的新值後i變成了6)redis
經過以前lab5/lab6的學習,咱們知道在i++具體的機器指令序列執行的每一步過程當中,操做系統均可能經過時鐘中斷打斷對應線程的執行,進行線程的上下文切換。機器指令是原子性的,但高級語言中的一條指令底層可能對應多個機器指令,在執行的過程當中可能被中斷介入,沒法保證執行的連貫性。數組
例如,存在兩個併發執行的線程a、線程b,都對線程間的共享變量i(i=5)進行了i++操做。安全
兩個線程的執行i++時的機器指令流按照時間順序依次爲: 網絡
1. 線程a讀取內存中變量i的值,寫入寄存器edx(此時內存中i的值爲5,edx的值爲5)。
2. 線程a令edx進行+1運算,此時寄存器edx的值爲6。
3. 操做系統處理時鐘中斷,發現線程a的時間片已經用完,將其掛起,保存線程a的上下文(此時線程a的寄存器上下文中edx=6);並調度線程b開始獲取cpu執行。
4. 線程b讀取內存中變量i的值,寫入寄存器edx(此時內存中i的值爲5,edx的值爲5)。
5. 線程b令edx進行+1運算,此時寄存器edx的值爲6。
6. 操做系統處理時鐘中斷,發現線程b的時間片已經用完,將其掛起,保存線程b的上下文(此時線程b的寄存器上下文中edx=6);並調度線程a開始獲取cpu執行。
7. 線程a恢復現場繼續往下執行,將現場恢復後edx的值寫回內存中變量i對應的內存地址中,寫回後變量i=6。
8. 操做系統處理時鐘中斷,發現線程a的時間片已經用完,將其掛起;並調度線程a開始獲取cpu執行。
9. 線程b恢復現場繼續往下執行,將現場恢復後edx的值寫會內存中變量i對應的內存地址中,寫回後變量i=6。
上述的例子中,因爲操做系統的搶佔式調度以及高級語言中i++操做的非原子性,使得本來初始值爲5的變量i,在執行兩次i++以後獲得的並非預期的7,而是錯誤的6。這還僅僅是兩個併發線程對於一個共享變量的操做問題,實際的程序中會涉及到更多的併發線程和共享變量,使得所編寫的多線程併發程序正確性沒法獲得保證。
在絕大多數狀況下,程序的正確性都比性能重要的多。操做系統在引入搶佔式調度的線程併發機制的同時,也須要提供相應的手段來解決線程安全問題。
解決線程安全問題,主要有兩個思路:一是消除程序的併發性;二是阻止多個線程併發的訪問共享資源(共享內存、共享文件、共享外設等等)的訪問,即互斥:使得一個線程在訪問某一共享資源時,其它的線程不能進行一樣的操做。
第一種思路被一些I/O密集型的應用程序所使用,即整個程序(進程)中只有一個線程在工做,經過操做系統底層提供的i/o多路複用機制進行工做,早期的redis以及nodeJS就是工做在單線程模型下的。單線程工做的應用程序因爲不存在多個線程併發執行的場景,消除了線程的併發性,天然也不須要處理線程安全問題了。
而操做系統解決線程安全問題的方式採用的是第二種思路(通用操做系統是用於同時爲大量進程、線程服務的,所以不能再回過頭來禁止併發),經過一些機制限制併發線程同時訪問會引發線程安全問題的共享變量,保證訪問的互斥性。
經過lab7的學習,將可以深刻學習操做系統底層實現線程同步、互斥機制,理解信號量、條件變量、管程等同步互斥機制的工做原理;也能夠對更上層的如java中的synchronized、AQS悲觀鎖、管程monitor、notify/wait等線程併發同步機制有更深的理解。
lab7是創建在以前實驗的基礎之上的,須要先理解以前的實驗才能更好的理解lab7中的內容。
能夠參考一下我關於前面實驗的博客:
1. ucore操做系統學習(一) ucore lab1系統啓動流程分析
2. ucore操做系統學習(二) ucore lab2物理內存管理分析
3. ucore操做系統學習(三) ucore lab3虛擬內存管理分析
4. ucore操做系統學習(四) ucore lab4內核線程管理
5. ucore操做系統學習(五) ucore lab5用戶進程管理
6. ucore操做系統學習(六) ucore lab6線程調度器
ucore在lab7中的內容大體分爲如下幾個部分:
1. 實現等待隊列
2. 實現信號量
3. 使用信號量解決哲學家就餐問題
4. 基於信號量實現條件變量
5. 基於信號量和條件變量實現管程
6. 使用管程解決哲學家就餐問題
前面提到,ucore在lab7中實現的同步機制是基於休眠/喚醒機制的。爲了保證線程對於臨界區訪問的互斥性,在前一個線程已經進入了臨界區後,後續要訪問臨界區的線程會被阻塞以等待前一個線程離開臨界區,在以前進入臨界區的線程離開臨界區後被阻塞的線程會被再次喚醒得到進入臨界區的資格。
在有許多線程併發時,可能會有不止一個線程被阻塞在對應的臨界區,爲此抽象出了等待隊列結構(wait_queue)用於維護這一被阻塞線程的集合。當線程因爲互斥而被阻塞在臨界區時,將其加入等待隊列並放棄cpu進入阻塞態;當以前得到臨界區訪問權限的線程離開後,再從對應的等待隊列中選擇一個被阻塞、處於等待狀態的線程喚醒,被喚醒的線程能接着進入臨界區。
利用等待隊列,使得自始至終都只有最多一個線程在臨界區中,保證了互斥性;而線程在等待隊列中的休眠(阻塞)/喚醒動做,則實現了線程之間對於臨界區訪問的同步。
等待隊列固然並不僅適用於線程併發同步,當線程進入等待狀態以等待某一特定完成事件時(定時休眠一段時間、等待阻塞IO讀寫完成等等事件),底層均可以使用等待隊列來實現。
ucore在/kern/sync目錄下的wait.c、wait.h中實現了等待隊列wait_queue、等待隊列節點項wait_t以及相關的函數。
ucore的等待隊列底層是經過雙向鏈表結構實現的。和前面的實驗相似的,提供了一個宏定義le2wait用於訪問wait_link節點項對應的wait_t結構。
等待隊列結構:
/** * 等待隊列 * */ typedef struct { // 等待隊列的頭結點(哨兵節點) list_entry_t wait_head; } wait_queue_t; struct proc_struct; /** * 等待隊列節點項 * */ typedef struct { // 關聯的線程 struct proc_struct *proc; // 喚醒標識 uint32_t wakeup_flags; // 該節點所屬的等待隊列 wait_queue_t *wait_queue; // 等待隊列節點 list_entry_t wait_link; } wait_t; #define le2wait(le, member) \ to_struct((le), wait_t, member)
等待隊列結構底層操做:
// 初始化wait_t等待隊列項 void wait_init(wait_t *wait, struct proc_struct *proc); // 初始化等待隊列 void wait_queue_init(wait_queue_t *queue); // 將wait節點項插入等待隊列 void wait_queue_add(wait_queue_t *queue, wait_t *wait); // 將wait項從等待隊列中移除 void wait_queue_del(wait_queue_t *queue, wait_t *wait); // 獲取等待隊列中wait節點的下一項 wait_t *wait_queue_next(wait_queue_t *queue, wait_t *wait); // 獲取等待隊列中wait節點的前一項 wait_t *wait_queue_prev(wait_queue_t *queue, wait_t *wait); // 獲取等待隊列的第一項 wait_t *wait_queue_first(wait_queue_t *queue); // 獲取等待隊列的最後一項 wait_t *wait_queue_last(wait_queue_t *queue); // 等待隊列是否爲空 bool wait_queue_empty(wait_queue_t *queue); // wait項是否在等待隊列中 bool wait_in_queue(wait_t *wait);
// 將wait項從等待隊列中刪除(若是存在的話) #define wait_current_del(queue, wait) \ do { \ if (wait_in_queue(wait)) { \ wait_queue_del(queue, wait); \ } \ } while (0) #endif /* !__KERN_SYNC_WAIT_H__ */
/** * 初始化wait_t等待隊列項 * */ void wait_init(wait_t *wait, struct proc_struct *proc) { // wait項與proc創建關聯 wait->proc = proc; // 等待的狀態 wait->wakeup_flags = WT_INTERRUPTED; // 加入等待隊列 list_init(&(wait->wait_link)); } /** * 初始化等待隊列 * */ void wait_queue_init(wait_queue_t *queue) { // 等待隊列頭結點初始化 list_init(&(queue->wait_head)); } /** * 將wait節點項插入等待隊列 * */ void wait_queue_add(wait_queue_t *queue, wait_t *wait) { assert(list_empty(&(wait->wait_link)) && wait->proc != NULL); // wait項與等待隊列創建關聯 wait->wait_queue = queue; // 將wait項插入頭結點前 list_add_before(&(queue->wait_head), &(wait->wait_link)); } /** * 將wait項從等待隊列中移除 * */ void wait_queue_del(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_del_init(&(wait->wait_link)); } /** * 獲取等待隊列中wait節點的下一項 * */ wait_t * wait_queue_next(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_entry_t *le = list_next(&(wait->wait_link)); if (le != &(queue->wait_head)) { // *wait的下一項不是頭結點,將其返回 return le2wait(le, wait_link); } return NULL; } /** * 獲取等待隊列中wait節點的前一項 * */ wait_t * wait_queue_prev(wait_queue_t *queue, wait_t *wait) { assert(!list_empty(&(wait->wait_link)) && wait->wait_queue == queue); list_entry_t *le = list_prev(&(wait->wait_link)); if (le != &(queue->wait_head)) { // *wait的前一項不是頭結點,將其返回 return le2wait(le, wait_link); } return NULL; } /** * 獲取等待隊列的第一項 * */ wait_t * wait_queue_first(wait_queue_t *queue) { // 獲取頭結點的下一項 list_entry_t *le = list_next(&(queue->wait_head)); if (le != &(queue->wait_head)) { // 頭結點的下一項不是頭結點,將其返回 return le2wait(le, wait_link); } // 頭結點的下一項仍是頭結點,說明等待隊列爲空(只有一個wait_head哨兵節點) return NULL; } /** * 獲取等待隊列的最後一項 * */ wait_t * wait_queue_last(wait_queue_t *queue) { // 獲取頭結點的前一項 list_entry_t *le = list_prev(&(queue->wait_head)); if (le != &(queue->wait_head)) { // 頭結點的前一項不是頭結點,將其返回 return le2wait(le, wait_link); } // 頭結點的前一項仍是頭結點,說明等待隊列爲空(只有一個wait_head哨兵節點) return NULL; } /** * 等待隊列是否爲空 * */ bool wait_queue_empty(wait_queue_t *queue) { return list_empty(&(queue->wait_head)); } /** * wait項是否在等待隊列中 * */ bool wait_in_queue(wait_t *wait) { return !list_empty(&(wait->wait_link)); }
等待隊列休眠/喚醒等高層操做:
等待隊列對於線程的休眠、喚醒對應的高級操做依賴於上面介紹的、底層的等待隊列增刪改查操做。
// 將等待隊列中的wait項對應的線程喚醒 void wakeup_wait(wait_queue_t *queue, wait_t *wait, uint32_t wakeup_flags, bool del); // 將等待隊列中的第一項對應的線程喚醒 void wakeup_first(wait_queue_t *queue, uint32_t wakeup_flags, bool del); // 將等待隊列中的全部項對應的線程所有喚醒 void wakeup_queue(wait_queue_t *queue, uint32_t wakeup_flags, bool del); // 令對應wait項加入當前等待隊列;令當前線程阻塞休眠,掛載在該等待隊列中 void wait_current_set(wait_queue_t *queue, wait_t *wait, uint32_t wait_state);
/** * 將等待隊列中的wait項對應的線程喚醒 * */ void wakeup_wait(wait_queue_t *queue, wait_t *wait, uint32_t wakeup_flags, bool del) { if (del) { // 將wait項從等待隊列中刪除 wait_queue_del(queue, wait); } // 設置喚醒的緣由標識 wait->wakeup_flags = wakeup_flags; // 喚醒對應線程 wakeup_proc(wait->proc); } /** * 將等待隊列中的第一項對應的線程喚醒 * */ void wakeup_first(wait_queue_t *queue, uint32_t wakeup_flags, bool del) { wait_t *wait; if ((wait = wait_queue_first(queue)) != NULL) { wakeup_wait(queue, wait, wakeup_flags, del); } } /** * 將等待隊列中的全部項對應的線程所有喚醒 * */ void wakeup_queue(wait_queue_t *queue, uint32_t wakeup_flags, bool del) { wait_t *wait; if ((wait = wait_queue_first(queue)) != NULL) { if (del) { do { wakeup_wait(queue, wait, wakeup_flags, 1); } while ((wait = wait_queue_first(queue)) != NULL); } else { do { wakeup_wait(queue, wait, wakeup_flags, 0); } while ((wait = wait_queue_next(queue, wait)) != NULL); } } } /** * 令對應wait項加入當前等待隊列;令當前線程阻塞休眠,掛載在該等待隊列中 * */ void wait_current_set(wait_queue_t *queue, wait_t *wait, uint32_t wait_state) { assert(current != NULL); wait_init(wait, current); current->state = PROC_SLEEPING; current->wait_state = wait_state; wait_queue_add(queue, wait); }
信號量是一種同步互斥機制的實現,廣泛存在於如今的各類操做系統內核裏,最先是由著名計算機科學家Dijkstra提出。
ucore信號量定義:
信號量的定義和使用很是簡單和基礎,包含了一個信號量的值value以及用於線程同步的等待隊列。
/** * 信號量 * */ typedef struct { // 信號量值 int value; // 信號量對應的等待隊列 wait_queue_t wait_queue; } semaphore_t; /** * 初始化信號量 * */ void sem_init(semaphore_t *sem, int value) { sem->value = value; // 初始化等待隊列 wait_queue_init(&(sem->wait_queue)); }
信號量的主要操做分別是down和up,對應於Dijkstra提出信號量時提出的P/V操做。
信號量做爲同步互斥的基本結構,其down/up操做必須是原子性的,沒法被打斷髮生上下文切換。令軟件程序表現出原子性的方法有不少,因爲ucore是運行在單核的80386cpu上的,簡單起見便直接使用關閉中斷的方式來實現信號量操做的原子性(多核cpu的狀況下,關閉單核的中斷是不夠的,而關閉全部核心的中斷則性能損失太大,須要採起鎖總線等其它手段來實現軟件原子性)。
信號量的down操做:
信號量的down操做,是請求獲取一個信號量。
當信號量的value值大於0時,說明還能容納當前線程進入臨界區。
當信號量的value值等於0時,說明已經沒法容納更多的線程了,此時須要將當前線程阻塞在信號量的等待隊列上,等待信號量的up操做將其喚醒。
/** * 信號量down操做 扣減信號量 * 當信號量value不足時將當前線程阻塞在信號量上,等待其它線程up操做時將其喚醒 * */ static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state) { bool intr_flag; // 暫時關閉中斷,保證信號量的down操做是原子操做 local_intr_save(intr_flag); if (sem->value > 0) { // 信號量對應的value大於0,還有權使用 sem->value --; local_intr_restore(intr_flag); return 0; } // 信號量對應的value小於等於0,須要阻塞當前線程 wait_t __wait, *wait = &__wait; // 令當前線程掛在信號量的阻塞隊列中 wait_current_set(&(sem->wait_queue), wait, wait_state); // 恢復中斷,原子操做結束 local_intr_restore(intr_flag); // 當前線程進入阻塞狀態了,進行一次調度 schedule(); local_intr_save(intr_flag); // 喚醒後,原子操做將當前項從信號量的等待隊列中刪除 wait_current_del(&(sem->wait_queue), wait); local_intr_restore(intr_flag); if (wait->wakeup_flags != wait_state) { // 若是等待線程喚醒的標識與以前設置的參數wait_state不一致,將其狀態返回給調用方作進一步判斷 return wait->wakeup_flags; } return 0; }
信號量的up操做:
信號量的up操做,是增長一個信號量中的值。
當增長信號量值時發現當前信號量的等待隊列爲空時,則說明當前沒有線程被阻塞、須要進入信號量管制的臨界區中,簡單的將信號量值加1。
當增長信號量時發現等待隊列不爲空,則說明存在線程想要進入臨界區中,卻因爲沒有知足信號量的條件,被阻塞在了臨界區外。此時便從信號量的等待隊列中挑選出最先被阻塞的線程,將其喚醒,使得其得以進入臨界區。
/** * 信號量up操做 增長信號量或喚醒被阻塞在信號量上的一個線程(若是有的話) * */ static __noinline void __up(semaphore_t *sem, uint32_t wait_state) { bool intr_flag; // 暫時關閉中斷,保證信號量的up操做是原子操做 local_intr_save(intr_flag); { wait_t *wait; if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL) { // 信號量的等待隊列爲空,說明沒有線程等待在該信號量上 // 信號量value加1 sem->value ++; } else { assert(wait->proc->wait_state == wait_state); // 將等待隊列中的對應等待線程喚醒 wakeup_wait(&(sem->wait_queue), wait, wait_state, 1); } } local_intr_restore(intr_flag); }
信號量的down與up操做關係十分緊密,互相對照着看能夠更好的理解其工做原理。
互斥信號量:
value值被初始化爲1的信號量比較特殊,稱爲二元信號量,也被叫作互斥信號量。互斥信號量可以做爲mutex互斥鎖,用於保證臨界區中數據不會被線程併發的訪問。
哲學家就餐問題是Dijkstra提出的一個經典的多線程同步問題。大概場景是在一個環形的圓桌上,坐着五個哲學家,而桌上有五把叉子和五個碗。一個哲學家平時進行思考,飢餓時便試圖取用其左右最靠近他的叉子,只有在他拿到兩隻叉子時才能進餐。進餐完畢,放下叉子繼續思考。
解決哲學家就餐問題的基本思路是使用線程模擬哲學家,每一個線程對應一個活動着的哲學家。可是因爲5個併發活動的哲學家線程爭搶僅有的5把叉子,且哲學家只有在同時拿到兩根叉子時才能進餐,若是沒有良好的同步機制對這5個哲學家線程進行協調,那麼哲學家線程互相之間容易發生死鎖(例如,五個哲學家線程同時拿起了本身左手邊的叉子,都沒法拿起本身右邊的叉子,互相等待着。哲學家之間將永遠沒法進餐,紛紛餓死)。
使用Dijkstra提出的信號量機制能夠很好的解決哲學家就餐問題,下面看看ucore中是如何使用信號量解決哲學家就餐問題的。
哲學家線程主體執行邏輯:
ucore的lab7中的check_sync函數是整個lab7實驗的總控函數。在check_sync的前半部分使用kern_thread函數建立了N(N=5)個哲學家內核線程,用於執行philosopher_using_semaphore,模擬哲學家就餐問題。
philosopher_using_semaphore中哲學家循環往復的進行以下操做:
1. 哲學家進行思考(經過do_sleep系統調用進行休眠阻塞,模擬哲學家思考)
2. 經過phi_take_forks_sema函數嘗試着同時拿起左右兩個叉子(若是沒法拿到左右叉子,則會陷入阻塞狀態)
3. 哲學家進行就餐(經過do_sleep系統調用進行休眠阻塞,模擬哲學家就餐)
4. 經過phi_put_forks_sema函數同時放下左右兩個叉子
拿起叉子的phi_take_forks_sema函數和放下叉子phi_put_forks_sema的函數內部都是經過信號量進行同步的,在下面進行更進一步的分析。
#define N 5 /* 哲學家數目 */ #define LEFT (i-1+N)%N /* i的左鄰號碼 */ #define RIGHT (i+1)%N /* i的右鄰號碼 */ #define THINKING 0 /* 哲學家正在思考 */ #define HUNGRY 1 /* 哲學家想取得叉子 */ #define EATING 2 /* 哲學家正在吃麪 */ #define TIMES 4 /* 吃4次飯 */ #define SLEEP_TIME 10 void check_sync(void){ int i; //check semaphore 信號量解決哲學家就餐問題 sem_init(&mutex, 1); for(i=0;i<N;i++){ sem_init(&s[i], 0); int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_semaphore failed.\n"); } philosopher_proc_sema[i] = find_proc(pid); set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc"); } 。。。 條件變量(管程)解決哲學家就餐問題(暫時忽略) } //---------- philosophers problem using semaphore ---------------------- int state_sema[N]; /* 記錄每一個人狀態的數組 */ /* 信號量是一個特殊的整型變量 */ semaphore_t mutex; /* 臨界區互斥 */ semaphore_t s[N]; /* 每一個哲學家一個信號量 */ /** * 哲學家線程主體執行邏輯 * */ int philosopher_using_semaphore(void * arg) /* i:哲學家號碼,從0到N-1 */ { int i, iter=0; i=(int)arg; cprintf("I am No.%d philosopher_sema\n",i); while(iter++<TIMES) { /* 無限循環 */ cprintf("Iter %d, No.%d philosopher_sema is thinking\n",iter,i); /* 哲學家正在思考 */ // 使用休眠阻塞來模擬思考(哲學家線程阻塞N秒) do_sleep(SLEEP_TIME); // 哲學家嘗試着去拿左右兩邊的叉子(若是沒拿到會阻塞) phi_take_forks_sema(i); /* 須要兩隻叉子,或者阻塞 */ cprintf("Iter %d, No.%d philosopher_sema is eating\n",iter,i); /* 進餐 */ // 使用休眠阻塞來模擬進餐(哲學家線程阻塞N秒) do_sleep(SLEEP_TIME); // 哲學家就餐結束,將叉子放回桌子。 // 當發現以前有臨近的哲學家嘗試着拿左右叉子就餐時卻沒有成功拿到,嘗試着喚醒對應的哲學家 phi_put_forks_sema(i); /* 把兩把叉子同時放回桌子 */ } cprintf("No.%d philosopher_sema quit\n",i); return 0; }
phi_take_forks_sema函數表示哲學家嘗試着拿起左右叉子想要就餐;而phi_put_forks_sema函數表示哲學家進餐結束後放下左右叉子。
二者都是經過全局的互斥信號量mutex的down操做進行全局的互斥,保證在同一時刻只有一個哲學家線程可以進入臨界區,對臨界區的資源叉子進行拿起/放下操做。對不一樣的哲學家線程進行互斥,保證查看左右叉子的狀態時不會出現併發問題。在後面對mutex的up操做用於釋放mutex互斥信號量,以離開臨界區,喚醒可能阻塞在mutex信號量中的其它哲學家線程,讓阻塞在mutex信號量中的另外一個線程得以進入臨界區。
phi_take_forks_sema函數分析:
在執行phi_take_forks_sema拿叉子時,經過關鍵的phi_test_sema函數進行條件的判斷,判斷當前哲學家線程i的左右哲學家線程是否都未就餐。
若是條件知足(在拿叉子時,phi_test_sema前哲學家i已經被預先設置爲HUNGRY飢餓狀態了),則表明當前哲學家i能夠進餐(其拿起了左右叉子,也表明着其相鄰的左右哲學家沒法就餐)。
而若是條件不知足則會在phi_take_forks_sema的最後,被down(&s[i])阻塞在信號量s[i]上,等待其被左右兩旁就餐完畢的哲學家將其喚醒。
phi_put_forks_sema函數分析:
在執行phi_put_forks_sema放下叉子時,首先經過設置state_sema[i]的狀態爲Thinking,表明哲學家i已經就餐完畢從新進入思考狀態。同時哲學家i在放下叉子後,經過phi_test_sema(LEFT)和phi_test_sema(RIGHT)來判斷相鄰的哲學家在本身就餐的這段時間是否也陷入了飢餓狀態,卻因爲暫時拿不到叉子而被阻塞了(LEFT和RIGHT宏利用取模,解決下標迴環計算的問題)。若是確實存在這種狀況,經過phi_test_sema函數嘗試着令相鄰的哲學家進行就餐(也許被阻塞哲學家的隔壁另外一邊的哲學家依然在就餐,那麼此時依然沒法將其喚醒就餐;而須要等到另外一邊的哲學家就餐完畢來嘗試將其喚醒)。
經過互斥信號量mutex實現哲學家線程就餐對臨界區資源-叉子訪問的互斥性,避免了併發時對叉子狀態判斷不許確的狀況產生;同時利用信號量數組semaphore_t s[N]對哲學家拿取、放下叉子的操做進行同步,使得哲學家們在叉子資源有限、衝突的狀況下有序的就餐,不會出死鎖、飢餓等現象。
/** * 哲學家i拿起左右叉子 */ void phi_take_forks_sema(int i) /* i:哲學家號碼從0到N-1 */ { // 拿叉子時須要經過mutex信號量進行互斥,防止併發問題(進入臨界區) down(&mutex); // 記錄下哲學家i飢餓的事實(執行phi_take_forks_sema嘗試拿叉子,說明哲學家i進入了HUNGRY飢餓狀態) state_sema[i]=HUNGRY; // 試圖同時獲得左右兩隻叉子 phi_test_sema(i); // 離開臨界區(喚醒可能阻塞在mutex上的其它線程) up(&mutex); // phi_test_sema中若是成功拿到叉子進入了就餐狀態,會先執行up(&s[i]),再執行down(&s[i])時便不會阻塞 // 反之,若是phi_test_sema中沒有拿到叉子,則down(&s[i])將會令哲學家i阻塞在信號量s[i]上 down(&s[i]); } /** * 哲學家i放下左右叉子 */ void phi_put_forks_sema(int i) /* i:哲學家號碼從0到N-1 */ { // 放叉子時須要經過mutex信號量進行互斥,防止併發問題(進入臨界區) down(&mutex); /* 進入臨界區 */ // 哲學家進餐結束(執行phi_put_forks_sema放下叉子,說明哲學家已經就餐完畢,從新進入THINKING思考狀態) state_sema[i]=THINKING; // 當哲學家i就餐結束,放下叉子時。須要判斷左、右臨近的哲學家在本身就餐的這段時間內是否也進入了飢餓狀態,卻由於本身就餐拿走了叉子而沒法同時得到左右兩個叉子。 // 爲此哲學家i在放下叉子後須要嘗試着判斷在本身放下叉子後,左/右臨近的、處於飢餓的哲學家可否進行就餐,若是能夠就喚醒阻塞的哲學家線程,並令其進入就餐狀態(EATING) phi_test_sema(LEFT); /* 看一下左鄰居如今是否能進餐 */ phi_test_sema(RIGHT); /* 看一下右鄰居如今是否能進餐 */ up(&mutex); /* 離開臨界區q(喚醒可能阻塞在mutex上的其它線程) */ } /** * 判斷哲學家i是否能夠拿起左右叉子 */ void phi_test_sema(i) /* i:哲學家號碼從0到N-1 */ { // 當哲學家i處於飢餓狀態(HUNGRY),且其左右臨近的哲學家都沒有在就餐狀態(EATING) if(state_sema[i]==HUNGRY&&state_sema[LEFT]!=EATING &&state_sema[RIGHT]!=EATING) { // 哲學家i餓了(HUNGRY),且左右兩邊的叉子都沒人用。 // 令哲學家進入就餐狀態(EATING) state_sema[i]=EATING; // 喚醒阻塞在對應信號量上的哲學家線程(當是哲學家線程i本身執行phi_test_sema(i)時,則信號量直接加1,抵消掉phi_take_forks_sema中的down操做,表明直接拿起叉子就餐成功而不用進入阻塞態) up(&s[i]); } }
ucore中的條件變量是基於信號量實現的,同時條件變量也做爲管程Monitor結構的重要組成部分。
條件變量condvar結構定義:
/** * 條件變量 * */ typedef struct condvar{ // 條件變量相關的信號量,用於阻塞/喚醒線程 semaphore_t sem; // the sem semaphore is used to down the waiting proc, and the signaling proc should up the waiting proc // 等待在條件變量之上的線程數 int count; // the number of waiters on condvar // 擁有該條件變量的monitor管程 monitor_t * owner; // the owner(monitor) of this condvar } condvar_t;
管程monitor結構定義:
/** * 管程 * */ typedef struct monitor{ // 管程控制併發的互斥鎖(應該被初始化爲1的互斥信號量) semaphore_t mutex; // the mutex lock for going into the routines in monitor, should be initialized to 1 // 管程內部協調各併發線程的信號量(線程能夠經過該信號量掛起本身,其它併發線程或者被喚醒的線程能夠反過來喚醒它) semaphore_t next; // the next semaphore is used to down the signaling proc itself, and the other OR wakeuped waiting proc should wake up the sleeped signaling proc. // 休眠在next信號量中的線程個數 int next_count; // the number of of sleeped signaling proc // 管程所屬的條件變量(能夠是數組,對應n個條件變量) condvar_t *cv; // the condvars in monitor } monitor_t;
/** * 初始化管程 * */ void monitor_init (monitor_t * mtp, size_t num_cv) { int i; assert(num_cv>0); mtp->next_count = 0; mtp->cv = NULL; // 管程的互斥信號量值設爲1(初始化時未被鎖住) sem_init(&(mtp->mutex), 1); //unlocked // 管程的協調信號量設爲0,當任何一個線程發現不知足條件時,當即阻塞在該信號量上 sem_init(&(mtp->next), 0); // 爲條件變量分配內存空間(參數num_cv指定管程所擁有的條件變量的個數) mtp->cv =(condvar_t *) kmalloc(sizeof(condvar_t)*num_cv); assert(mtp->cv!=NULL); // 構造對應個數的條件變量 for(i=0; i<num_cv; i++){ mtp->cv[i].count=0; // 條件變量信號量初始化時設置爲0,當任何一個線程發現不知足條件時,當即阻塞在該信號量上 sem_init(&(mtp->cv[i].sem),0); mtp->cv[i].owner=mtp; } }
條件變量的等待操做實現:
cond_wait函數實現條件變量的wait操做。條件變量的wait操做和信號量的down功能相似。當條件變量對應的條件不知足時,經過信號量的down操做,令當前線程阻塞、等待在條件變量所屬的信號量上。
// Suspend calling thread on a condition variable waiting for condition Atomically unlocks // mutex and suspends calling thread on conditional variable after waking up locks mutex. Notice: mp is mutex semaphore for monitor's procedures /** * 條件變量阻塞等待操做 * 令當前線程阻塞在該條件變量上,等待其它線程將其經過cond_signal將其喚醒。 * */ void cond_wait (condvar_t *cvp) { //LAB7 EXERCISE1: YOUR CODE cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); /* * cv.count ++; * if(mt.next_count>0) * signal(mt.next) * else * signal(mt.mutex); * wait(cv.sem); * cv.count --; */ // 阻塞在當前條件變量上的線程數加1 cvp->count++; if(cvp->owner->next_count > 0) // 對應管程中存在被阻塞的其它線程 // 喚醒阻塞在對應管程協調信號量next中的線程 up(&(cvp->owner->next)); else // 若是對應管程中不存在被阻塞的其它線程 // 釋放對應管程的mutex二元信號量 up(&(cvp->owner->mutex)); // 令當前線程阻塞在條件變量上 down(&(cvp->sem)); // down返回,說明已經被再次喚醒,條件變量count減1 cvp->count --; cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); }
條件變量的喚醒操做實現:
cond_signal函數用於實現條件變量的signal操做。條件變量的signal操做和信號量的up功能相似。當條件變量對應的條件知足時,經過信號量的up操做,喚醒阻塞在對應條件變量中的線程。
// Unlock one of threads waiting on the condition variable. /** * 條件變量喚醒操做 * 解鎖(喚醒)一個等待在當前條件變量上的線程 * */ void cond_signal (condvar_t *cvp) { //LAB7 EXERCISE1: YOUR CODE cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); /* * cond_signal(cv) { * if(cv.count>0) { * mt.next_count ++; * signal(cv.sem); * wait(mt.next); * mt.next_count--; * } * } */ // 若是等待在條件變量上的線程數大於0 if(cvp->count>0) { // 須要將當前線程阻塞在管程的協調信號量next上,next_count加1 cvp->owner->next_count ++; // 令阻塞在條件變量上的線程進行up操做,喚醒線程 up(&(cvp->sem)); // 令當前線程阻塞在管程的協調信號量next上 // 保證管程臨界區中只有一個活動線程,先令本身阻塞在next信號量上;等待被喚醒的線程在離開臨界區後來反過來將本身從next信號量上喚醒 down(&(cvp->owner->next)); // 當前線程被其它線程喚醒從down函數中返回,next_count減1 cvp->owner->next_count --; } cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); }
條件變量與管程的交互:
仔細比對條件變量與信號量的實現,會發現大體的實現思路是一致的。但ucore中實現的條件變量是做爲管程的一部分工做的,所以在wait和signal操做中都額外耦合了與對應管程owner交互的地方。
在管程中進入臨界區的線程發現條件不知足而進行條件變量的wait操做時,須要釋放管程中臨界區的鎖,在wait操做掛起自身時令其它想要進入管程內的線程得到臨界區的訪問權限。
在管程中臨界區的線程發現某一條件獲得知足時,將執行對應條件變量的signal操做以喚醒等待在其上的某一個線程。可是因爲管程臨界區的互斥性,不能容許臨界區內有超過一個的線程在其中運行,所以執行signal操做的線程須要首先將本身阻塞掛起在管程的next信號量上,使得被喚醒的那一個線程獨佔臨界區資源。當被喚醒的線程離開臨界區時,也會及時的喚醒掛起在管程next信號量上的對應線程。
因爲併發環境下多個線程經過條件變量等同步機制交替的休眠/喚醒,邏輯執行流並非連貫的,所以條件變量和管程的實現顯得比較繞,使人費解。經過學習如何用管程解決哲學家就餐問題,看看使用管程/條件變量是如何進行線程同步互斥的,加深對條件變量、管程工做機制的理解。
在checkSync函數的後半部分,是關於如何使用管程解決哲學家就餐問題。在check_sync的後半部分建立了N(N=5)個哲學家內核線程,用於執行philosopher_using_condvar函數,模擬哲學家就餐問題。
philosopher_using_condvar中哲學家循環往復的進行以下操做(總體流程和信號量的實現大致一致):
1. 哲學家進行思考(經過do_sleep系統調用進行休眠阻塞,模擬哲學家思考)
2. 經過phi_take_forks_condvar函數嘗試着同時拿起左右兩個叉子(若是沒有拿到左右叉子,陷入阻塞)
3. 哲學家進行就餐(經過do_sleep系統調用進行休眠阻塞,模擬哲學家就餐)
4. 經過phi_put_forks_condvar函數同時放下左右兩個叉子,回到思考狀態
拿起叉子的phi_take_forks_condvar函數和放下叉子phi_put_forks_condvar的函數內部都是經過條件變量進行同步的,在下面進行更進一步的分析。
checkSync函數:
void check_sync(void){ int i; //check semaphore sem_init(&mutex, 1); for(i=0;i<N;i++){ sem_init(&s[i], 0); int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_semaphore failed.\n"); } philosopher_proc_sema[i] = find_proc(pid); set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc"); } //check condition variable monitor_init(&mt, N); for(i=0;i<N;i++){ state_condvar[i]=THINKING; int pid = kernel_thread(philosopher_using_condvar, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_condvar failed.\n"); } philosopher_proc_condvar[i] = find_proc(pid); set_proc_name(philosopher_proc_condvar[i], "philosopher_condvar_proc"); } }
phi_take_forks_condvar函數表達哲學家嘗試着拿起左右叉子想要就餐;而phi_put_forks_condvar函數表達哲學家進餐結束後放下左右叉子。
二者經過管程中的互斥信號量mutex的down操做進行全局的互斥,保證在同一時刻只有一個哲學家線程可以進入臨界區,對臨界區的資源叉子進行拿起/放下操做,對不一樣的哲學家線程進行互斥,保證查看左右叉子的狀態時不會出現併發問題。
在離開管程的臨界區時(註釋into routine in monitor和leave routine in monitor之間爲管程的臨界區代碼),當前線程會根據管程內是否存在其它線程(mtp->next_count>0)而有不一樣的操做。當發現管程中的next信號量上存在其它線程阻塞在上面時,優先喚醒next信號量上的線程(阻塞在next上的線程是因爲要喚醒等待在某一條件變量上的線程,爲了保證臨界區互斥自願被阻塞的,所以被喚醒的線程在離開臨界區後須要第一時間將其喚醒);而若是next信號量中不存在休眠的線程,那麼就和信號量的實現相似,釋放mutex互斥鎖,喚醒可能等待在其上的某一線程。
上述ucore實現的管程其線程交互的邏輯是基於Hoare語義的,此外還存在MESA語義的管程和Hansen語義的管程(MESA管程和Hansen管程實現相似前面的信號量實現哲學家就餐)。
Hoare管程在signal喚醒其它線程時會令本身陷入休眠,嚴格的保證了臨界區線程的互斥,理論上更加可靠,常見於教科書的理論中。因爲Hoare語義的管程須要額外引入一個等待隊列(next信號量),所以其性能並不如其它兩種語義的管程,現實中被使用的地方不多。
phi_take_forks_condvar函數(同時拿起左右叉子):
void phi_take_forks_condvar(int i) { // 拿叉子時須要經過mutex信號量進行互斥,防止併發問題(進入臨界區) down(&(mtp->mutex)); //--------into routine in monitor-------------- // LAB7 EXERCISE1: YOUR CODE // I am hungry // try to get fork // I am hungry // 記錄下哲學家i飢餓的事實(執行phi_take_forks_condvar嘗試拿叉子,說明哲學家i進入了HUNGRY飢餓狀態) state_condvar[i]=HUNGRY; // 試圖同時獲得左右兩隻叉子 phi_test_condvar(i); if (state_condvar[i] != EATING) { // state_condvar[i]狀態不爲EATING,說明phi_test_condvar嘗試拿左右叉子進餐失敗 cprintf("phi_take_forks_condvar: %d didn't get fork and will wait\n",i); // 等待阻塞在管程的條件變量cv[i]上 cond_wait(&mtp->cv[i]); } //--------leave routine in monitor-------------- if(mtp->next_count>0){ // 當離開管程臨界區時,若是發現存在線程等待在mtp->next上 // 在當前實驗中,執行到這裏的當前線程多是阻塞在cond_wait中被其它線程喚醒的,對應線程是經過phi_test_condvar的cond_signal操做喚醒當前線程的 // 執行cond_signal時爲了保證管程臨界區內不存在併發的線程訪問,在喚醒其它線程時,會把本身阻塞在管程的next信號量上,等待此時離開臨界區的線程將其喚醒 up(&(mtp->next)); }else{ // 當離開管程臨界區時,沒有其它線程等待在mtp->next上,直接釋放管程的互斥鎖mutex便可(喚醒可能阻塞在mutex上的其它線程) up(&(mtp->mutex)); } }
phi_put_forks_condvar函數(同時放下左右叉子):
void phi_put_forks_condvar(int i) { // 放叉子時須要經過mutex信號量進行互斥,防止併發問題(進入臨界區) down(&(mtp->mutex)); //--------into routine in monitor-------------- // LAB7 EXERCISE1: YOUR CODE // I ate over // test left and right neighbors // I ate over // 哲學家進餐結束(執行phi_put_forks_condvar放下叉子,說明哲學家已經就餐完畢,從新進入THINKING思考狀態) state_condvar[i]=THINKING; // test left and right neighbors // 當哲學家i就餐結束,放下叉子時。須要判斷左、右臨近的哲學家在本身就餐的這段時間內是否也進入了飢餓狀態,卻由於本身就餐拿走了叉子而沒法同時得到左右兩個叉子。 // 爲此哲學家i在放下叉子後須要嘗試着判斷在本身放下叉子後,左/右臨近的、處於飢餓的哲學家可否進行就餐,若是能夠就喚醒阻塞的哲學家線程,並令其進入就餐狀態(EATING) phi_test_condvar(LEFT); // 看一下左鄰居如今是否能進餐 phi_test_condvar(RIGHT); // 看一下右鄰居如今是否能進餐 //--------leave routine in monitor-------------- // lab7的參考答案 if(mtp->next_count>0){ cprintf("execute here mtp->next_count>0 \n\n\n\n\n\n"); up(&(mtp->next)); }else{ cprintf("execute here mtp->next_count=0 \n\n\n\n\n"); up(&(mtp->mutex)); } // 我的認爲放叉子和取叉子的狀況並不同,不會出現mtp->next_count>0的狀況,這裏只須要釋放互斥鎖便可(若是這裏理解的有問題,還請指正) // 當放叉子的線程在phi_put_forks_condvar中離開管程臨界區時,只有兩種狀況 // 1. 沒有發現鄰居能夠進餐,自身不會被阻塞 // 2. 發現有鄰居以前被拿不到叉子阻塞了,如今能夠進餐了,phi_test_condvar中的cond_signal會暫時令本身阻塞在next信號量上 // 可是很快被本身叫醒的相鄰的哲學家線程在被喚醒後一離開臨界區就會將本身喚醒,在cond_signal被喚醒後的操做中mtp->next_count會自減,而變爲0 // // 以上兩種狀況下,因爲管程自己最外面有一個mutex互斥信號量,因此不會出現兩個線程同時阻塞在next信號量中,所以也就不會出現參考答案中mtp->next_count>0的狀況 // up(&(mtp->mutex)); }
phi_test_condvar函數(判斷哲學家i是否能拿起左右叉子開始就餐):
void phi_test_condvar (i) { // 當哲學家i處於飢餓狀態(HUNGRY),且其左右臨近的哲學家都沒有在就餐狀態(EATING) if(state_condvar[i]==HUNGRY&&state_condvar[LEFT]!=EATING &&state_condvar[RIGHT]!=EATING) { cprintf("phi_test_condvar: state_condvar[%d] will eating\n",i); // 哲學家i餓了(HUNGRY),且左右兩邊的叉子都沒人用。 // 令哲學家進入就餐狀態(EATING) state_condvar[i] = EATING ; cprintf("phi_test_condvar: signal self_cv[%d] \n",i); // 喚醒阻塞在對應信號量上的哲學家線程 cond_signal(&mtp->cv[i]) ; } }
信號量是一個簡單、高效的同步互斥機制,但也正是因爲其過於底層,因此在編寫線程同步代碼時須要十分當心謹慎,對每一處信號量的使用仔細斟酌才能保證程序的正確性,對開發人員的心智是一個巨大的負擔。
而將管程做爲一個總體的結構來看的話,會發現管程雖然將控制同步的代碼邏輯抽象爲了一個固定的模板變得容易使用,但卻與要保護的臨界區業務邏輯代碼耦合的很嚴重,操做系統的開發者很難將管程控制同步的代碼植入進對應的應用程序內部。
所以操做系統一般只提供了信號量以及條件變量這種偏底層、耦合性低的同步互斥機制;而管程機制則更多的由高級語言的編譯器在語言層面實現,以簡化程序員開發複雜併發同步程序的複雜度。高級語言編譯器在編譯本地機器代碼時,能夠在須要進行同步的代碼邏輯塊中利用操做系統底層提供的信號量或是條件變量機制來實現管程。
例如java中若是在方法定義時簡單的加上synchronized關鍵字就能控制多線程環境下不會併發的執行該方法。這是由於在編譯成字節碼時,相比於普通方法額外插入了一些管程Monitor相關的同步控制代碼(管程底層依賴的信號量、條件變量機制仍是取決於對應的操做系統平臺,只不過被jvm屏蔽掉了差別,讓java程序員感知不到)。
經過ucore的lab7的學習,讓我理解了等待隊列、信號量、條件變量以及管程的大體工做原理,也對日常會接觸到的java中的synchronized、AQS、Reentrantlock其底層機制有了進一步的認識。
lab7的學習使我收穫頗豐,但這對於線程同步相關領域的學習仍是遠遠不夠。同屬於進程間通訊IPC領域的經典問題除了哲學家就餐問題外,還有讀者/寫者問題等;對於線程安全問題,除了使用休眠/喚醒進行線程上下文切換的阻塞的方式以外,還有使用CAS等重試的方法;除了信號量、條件變量等基於單機系統內的線程同步方式外,還有基於分佈式系統,經過網絡進行多機器線程同步的機制等等。雖然不夠了解的知識還有不少,但經過ucore操做系統的學習,爲我學習相關的領域知識打下了基礎,也給了我相信最終能融會貫通這些知識的信心。
這篇博客的完整代碼註釋在個人github上:https://github.com/1399852153/ucore_os_lab (fork自官方倉庫)中的lab7_answer。
但願個人博客能幫助到對操做系統、ucore os感興趣的人。存在許多不足之處,還請多多指教。