全部的實驗報告將會在 Github 同步更新,更多內容請移步至Github:https://github.com/AngelKitty/review_the_national_post-graduate_entrance_examination/blob/master/books_and_notes/professional_courses/operating_system/sources/ucore_os_lab/docs/lab_report/html
lab7
會依賴 lab1~lab6
,咱們須要把作的 lab1~lab6
的代碼填到 lab7
中缺失的位置上面。練習 0 就是一個工具的利用。這裏我使用的是 Linux
下的系統已預裝好的 Meld Diff Viewer
工具。和 lab6
操做流程同樣,咱們只須要將已經完成的 lab1~lab6
與待完成的 lab7
(因爲 lab7
是基於 lab1~lab6
基礎上完成的,因此這裏只須要導入 lab6
)分別導入進來,而後點擊 compare
就好了。linux
而後軟件就會自動分析兩份代碼的不一樣,而後就一個個比較比較複製過去就好了,在軟件裏面是能夠支持打開對比複製了,點擊 Copy Right
便可。固然 bin
目錄和 obj
目錄下都是 make
生成的,就不用複製了,其餘須要修改的地方主要有如下七個文件,經過對比複製完成便可:git
proc.c default_pmm.c pmm.c swap_fifo.c vmm.c trap.c sche.c
根據試驗要求,咱們須要對部分代碼進行改進,這裏講須要改進的地方只有一處:github
trap.c() 函數數組
修改的部分以下:數據結構
static void trap_dispatch(struct trapframe *tf) { ++ticks; /* 註銷掉下面這一句 由於這一句被包含在了 run_timer_list() run_timer_list() 在以前的基礎上 加入了對 timer 的支持 */ // sched_class_proc_tick(current); run_timer_list(); }
在完成本練習以前,先說明下什麼是哲學家就餐問題:併發
哲學家就餐問題,即有五個哲學家,他們的生活方式是交替地進行思考和進餐。哲學家們公用一張圓桌,周圍放有五把椅子,每人坐一把。在圓桌上有五個碗和五根筷子,當一個哲學家思考時,他不與其餘人交談,飢餓時便試圖取用其左、右最靠近他的筷子,但他可能一根都拿不到。只有在他拿到兩根筷子時,方能進餐,進餐完後,放下筷子又繼續思考。編輯器
在分析以前,咱們先對信號量有個瞭解,既然要理解信號量的實現方法,咱們能夠先看看信號量的僞代碼:函數
struct semaphore { int count; queueType queue; }; void P(semaphore S){ S.count--; if (S.count<0) { 把進程置爲睡眠態; 將進程的PCB插入到S.queue的隊尾; 調度,讓出CPU; } } void V(semaphore S){ S.count++; if (S.count≤0) { 喚醒在S.queue上等待的第一個進程; } }
基於上訴信號量實現能夠認爲,當多個進程能夠進行互斥或同步合做時,一個進程會因爲沒法知足信號量設置的某條件而在某一位置中止,直到它接收到一個特定的信號(代表條件知足了)。爲了發信號,須要使用一個稱做信號量的特殊變量。爲經過信號量 s 傳送信號,信號量經過 V、P 操做來修改傳送信號量。工具
實驗 7 的主要任務是實現基於信號量和管程去解決哲學家就餐問題,咱們知道,解決哲學家就餐問題須要建立與之相對應的內核線程,而全部內核線程的建立都離不開 pid 爲 1 的那個內核線程——idle,此時咱們須要去尋找在實驗 4 中討論過的地方,如何建立並初始化 idle 這個內核線程。
在實驗 7 中,具體的信號量數據結構被定義在(kern/sync/sem.h)中:
typedef struct { int value; wait_queue_t wait_queue; } semaphore_t;
找到相關函數 init_main(kern/process/proc.c,838——863行)
static int init_main(void *arg) { size_t nr_free_pages_store = nr_free_pages(); size_t kernel_allocated_store = kallocated(); int pid = kernel_thread(user_main, NULL, 0); if (pid <= 0) { panic("create user_main failed.\n"); } extern void check_sync(void); check_sync(); // check philosopher sync problem while (do_wait(0, NULL) == 0) { schedule(); } cprintf("all user-mode processes have quit.\n"); assert(initproc->cptr == NULL && initproc->yptr == NULL && initproc->optr == NULL); assert(nr_process == 2); assert(list_next(&proc_list) == &(initproc->list_link)); assert(list_prev(&proc_list) == &(initproc->list_link)); assert(nr_free_pages_store == nr_free_pages()); assert(kernel_allocated_store == kallocated()); cprintf("init check memory pass.\n"); return 0; }
該函數與實驗四基本沒有不一樣之處,惟一的不一樣在於它調用了 check_sync() 這個函數去執行了哲學家就餐問題。
咱們分析 check_sync 函數(kern/sync/check_sync.c,182+行):
void check_sync(void) { int i; //check semaphore sem_init(&mutex, 1); for(i=0;i<N;i++) { //N是哲學家的數量 sem_init(&s[i], 0); //初始化信號量 int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0);//線程須要執行的函數名、哲學家編號、0表示共享內存 //建立哲學家就餐問題的內核線程 if (pid <= 0) { //建立失敗的報錯 panic("create No.%d philosopher_using_semaphore failed.\n"); } philosopher_proc_sema[i] = find_proc(pid); set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc"); } //check condition variable monitor_init(&mt, N); for(i=0;i<N;i++){ state_condvar[i]=THINKING; int pid = kernel_thread(philosopher_using_condvar, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_condvar failed.\n"); } philosopher_proc_condvar[i] = find_proc(pid); set_proc_name(philosopher_proc_condvar[i], "philosopher_condvar_proc"); } }
經過觀察函數的註釋,咱們發現,這個 check_sync 函數被分爲了兩個部分,第一部分使用了信號量來解決哲學家就餐問題,第二部分則是使用管程的方法。所以,練習 1 中咱們只須要關注前半段。
首先觀察到利用 kernel_thread 函數建立了一個哲學家就餐問題的內核線程(kern/process/proc.c,270——280行)
int kernel_thread(int (*fn)(void *), void *arg, uint32_t clone_flags) { struct trapframe tf; //中斷相關 memset(&tf, 0, sizeof(struct trapframe)); tf.tf_cs = KERNEL_CS; tf.tf_ds = tf.tf_es = tf.tf_ss = KERNEL_DS; tf.tf_regs.reg_ebx = (uint32_t)fn; tf.tf_regs.reg_edx = (uint32_t)arg; tf.tf_eip = (uint32_t)kernel_thread_entry; return do_fork(clone_flags | CLONE_VM, 0, &tf); }
簡單的來講,這個函數須要傳入三個參數:
其他地方則是設置一些寄存器的值,保留須要執行的函數開始執行的地址,以便建立了新的內核線程以後,函數可以在內核線程中找到入口地址,執行函數功能。
接下來,讓咱們來分析須要建立的內核線程去執行的目標函數 philosopher_using_semaphore(kern/sync/check_sync.c,52——70行)
int philosopher_using_semaphore(void * arg)/* i:哲學家號碼,從0到N-1 */ { int i, iter=0; i=(int)arg; //傳入的參數轉爲 int 型,表明哲學家的編號 cprintf("I am No.%d philosopher_sema\n",i); while(iter++<TIMES) /* 無限循環 在這裏咱們取了 TIMES=4*/ { cprintf("Iter %d, No.%d philosopher_sema is thinking\n",iter,i);// 哲學家正在思考 do_sleep(SLEEP_TIME);//等待 phi_take_forks_sema(i);// 須要兩隻叉子,或者阻塞 cprintf("Iter %d, No.%d philosopher_sema is eating\n",iter,i);// 進餐 do_sleep(SLEEP_TIME); phi_put_forks_sema(i);// 把兩把叉子同時放回桌子 } //哲學家思考一段時間,吃一段時間飯 cprintf("No.%d philosopher_sema quit\n",i); return 0; }
參數及其分析:
從這個函數,咱們看到,哲學家須要思考一段時間,而後吃一段時間的飯,這裏面的「一段時間」就是經過系統調用 sleep 實現的,內核線程調用 sleep,而後這個線程休眠指定的時間,從某種方面模擬了吃飯和思考的過程。
如下是 do_sleep 的實現:(kern/process/proc.c,922+行)
int do_sleep(unsigned int time) { if (time == 0) { return 0; } bool intr_flag; local_intr_save(intr_flag);//關閉中斷 timer_t __timer, *timer = timer_init(&__timer, current, time); //聲明一個定時器,並將其綁定到當前進程 current 上 current->state = PROC_SLEEPING; current->wait_state = WT_TIMER; add_timer(timer); local_intr_restore(intr_flag); schedule(); del_timer(timer); return 0; }
咱們看到,睡眠的過程當中是沒法被打斷的,符合咱們通常的認識,由於它在計時器使用的過程當中經過 local_intr_save 關閉了中斷,且利用了 timer_init 定時器函數,去記錄指定的時間(傳入的參數time),且在這個過程當中,將進程的狀態設置爲睡眠,調用函數 add_timer 將綁定該進程的計時器加入計時器隊列。當計時器結束以後,打開中斷,恢復正常。
而反過來看傳入的參數,即爲定時器的定時值 time,在上一層函數中,傳入的是 kern/sync/check_sync.c,14 行的宏定義,TIME 的值爲 10。
相關的圖解以下:
目前看來,最關鍵的函數是 phi_take_forks_sema(i) 和 phi_take_forks_sema(i);
phi_take_forks_sema、phi_take_forks_sema 函數以下所示:(kern/sync/check_sync,c,34——50行)
void phi_take_forks_sema(int i) /* i:哲學家號碼從 0 到 N-1 */ { down(&mutex); /* 進入臨界區 */ state_sema[i]=HUNGRY; /* 記錄下哲學家 i 飢餓的事實 */ phi_test_sema(i); /* 試圖獲得兩隻叉子 */ up(&mutex); /* 離開臨界區 */ down(&s[i]); /* 若是得不到叉子就阻塞 */ } void phi_put_forks_sema(int i) /* i:哲學家號碼從 0 到 N-1 */ { down(&mutex); /* 進入臨界區 */ state_sema[i]=THINKING; /* 哲學家進餐結束 */ phi_test_sema(LEFT); /* 看一下左鄰居如今是否能進餐 */ phi_test_sema(RIGHT); /* 看一下右鄰居如今是否能進餐 */ up(&mutex); /* 離開臨界區 */ }
參數解釋:
其中,mutex 的數據類型是「信號量結構體」,其定義在 kern/sync/sem.h 中:
typedef struct { int value; wait_queue_t wait_queue; } semaphore_t;
如今來到了最關鍵的核心問題解決部分,首先是 down 和 up 操做:(kern/sync/sem.c,16——54行)
static __noinline void __up(semaphore_t *sem, uint32_t wait_state) { bool intr_flag; local_intr_save(intr_flag);//關閉中斷 { wait_t *wait; if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL) {//沒有進程等待 sem->value ++; //若是沒有進程等待,那麼信號量加一 } //有進程在等待 else { //不然喚醒隊列中第一個進程 assert(wait->proc->wait_state == wait_state); wakeup_wait(&(sem->wait_queue), wait, wait_state, 1);//將 wait_queue 中等待的第一個 wait 刪除,並將該進程喚醒 } } local_intr_restore(intr_flag); //開啓中斷,正常執行 }
up 函數的做用是:首先關中斷,若是信號量對應的 wait queue 中沒有進程在等待,直接把信號量的 value 加一,而後開中斷返回;若是有進程在等待且進程等待的緣由是 semophore 設置的,則調用 wakeup_wait 函數將 waitqueue 中等待的第一個 wait 刪除,且把此 wait 關聯的進程喚醒,最後開中斷返回。
static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state) { bool intr_flag; local_intr_save(intr_flag); //關閉中斷 if (sem->value > 0) { //若是信號量大於 0,那麼說明信號量可用,所以能夠分配給當前進程運行,分配完以後關閉中斷 sem->value --;//直接讓 value 減一 local_intr_restore(intr_flag);//打開中斷返回 return 0; } //當前信號量value小於等於0,代表沒法得到信號量 wait_t __wait, *wait = &__wait; wait_current_set(&(sem->wait_queue), wait, wait_state);//將當前的進程加入到等待隊列中 local_intr_restore(intr_flag);//打開中斷 //若是信號量數值小於零,那麼須要將當前進程加入等待隊列並調用 schedule 函數查找下一個能夠被運行調度的進程,此時,若是可以查到,那麼喚醒,並將其中隊列中刪除並返回 schedule();//運行調度器選擇其餘進程執行 local_intr_save(intr_flag);//關中斷 wait_current_del(&(sem->wait_queue), wait);//被 V 操做喚醒,從等待隊列移除 local_intr_restore(intr_flag);//開中斷 if (wait->wakeup_flags != wait_state) { return wait->wakeup_flags; } return 0; }
down 函數的做用是:首先關掉中斷,而後判斷當前信號量的 value 是否大於 0。若是是 >0,則代表能夠得到信號量,故讓 value 減一,並打開中斷返回便可;若是不是 >0,則代表沒法得到信號量,故須要將當前的進程加入到等待隊列中,並打開中斷,而後運行調度器選擇另一個進程執行。若是被 V 操做喚醒,則把自身關聯的 wait 從等待隊列中刪除(此過程須要先關中斷,完成後開中斷)。
其中,這裏調用了 local_intr_save 和 local_intr_restore 兩個函數,它們被定義在(kern/sync/sync.h,11——25行):
static inline bool __intr_save(void) { //臨界區代碼 if (read_eflags() & FL_IF) { intr_disable(); return 1; } return 0; } static inline void __intr_restore(bool flag) { if (flag) { intr_enable(); } }
很容易發現他們的功能是關閉和打開中斷。
分析完了 up 和 down,讓咱們來分析一下 test 函數:
phi_test_sema(LEFT); /* 看一下左鄰居如今是否能進餐 */ phi_test_sema(RIGHT); /* 看一下右鄰居如今是否能進餐 */
該函數被定義在(kern/sync/check_sync.c,86——94行):
void phi_test_sema(i) { if(state_sema[i]==HUNGRY&&state_sema[LEFT]!=EATING &&state_sema[RIGHT]!=EATING) { state_sema[i]=EATING; up(&s[i]); } }
在試圖得到筷子的時候,函數的傳入參數爲 i,即爲哲學家編號,此時,他本身爲 HUNGRY,並且試圖檢查旁邊兩位是否都在吃。若是都不在吃,那麼能夠得到 EATING 的狀態。
在從吃的狀態返回回到思考狀態的時候,須要調用兩次該函數,傳入的參數爲當前哲學家左邊和右邊的哲學家編號,由於他試圖喚醒左右鄰居,若是左右鄰居知足條件,那麼就能夠將他們設置爲 EATING 狀態。
其中,LEFT 和 RIGHT 的定義以下:
#define LEFT (i-1+N)%N #define RIGHT (i+1)%N
因爲哲學家坐圓桌,所以可使用餘數直接獲取左右編號。
練習一的整體執行流程以下:
請在實驗報告中給出內核級信號量的設計描述,並說其大體執行流流程。
實現了內核級信號量機制的函數均定義在 sem.c 中,所以對上述這些函數分析總結以下:
sem_init
:對信號量進行初始化的函數,根據在原理課上學習到的內容,信號量包括了等待隊列和一個整型數值變量,該函數只須要將該變量設置爲指定的初始值,而且將等待隊列初始化便可;__up
:對應到了原理課中說起到的 V 操做,表示釋放了一個該信號量對應的資源,若是有等待在了這個信號量上的進程,則將其喚醒執行;結合函數的具體實現能夠看到其採用了禁用中斷的方式來保證操做的原子性,函數中操做的具體流程爲:
__down
:一樣對應到了原理課中說起的P操做,表示請求一個該信號量對應的資源,一樣採用了禁用中斷的方式來保證原子性,具體流程爲:
up, down
:對 __up, __down
函數的簡單封裝;try_down
:不進入等待隊列的 P 操做,即時是獲取資源失敗也不會堵塞當前進程;請在實驗報告中給出給用戶態進程/線程提供信號量機制的設計方案,並比較說明給內核級提供信號量機制的異同。
將內核信號量機制遷移到用戶態的最大麻煩在於,用於保證操做原子性的禁用中斷機制、以及 CPU 提供的 Test and Set 指令機制都只能在用戶態下運行,而使用軟件方法的同步互斥又至關複雜,這就使得無法在用戶態下直接實現信號量機制;因而,爲了方便起見,能夠將信號量機制的實現放在 OS 中來提供,而後使用系統調用的方法統一提供出若干個管理信號量的系統調用,分別以下所示:
給內核級線程提供信號量機制和給用戶態進程/線程提供信號量機制的異同點在於:
首先掌握管程機制,而後基於信號量實現完成條件變量實現,而後用管程機制實現哲學家就餐問題的解決方案(基於條件變量)。
一個管程定義了一個數據結構和能爲併發進程所執行(在該數據結構上)的一組操做,這組操做能同步進程和改變管程中的數據。
管程主要由這四個部分組成:
管程至關於一個隔離區,它把共享變量和對它進行操做的若干個過程圍了起來,全部進程要訪問臨界資源時,都必須通過管程才能進入,而管程每次只容許一個進程進入管程,從而須要確保進程之間互斥。
但在管程中僅僅有互斥操做是不夠用的。進程可能須要等待某個條件 C 爲真才能繼續執行。
所謂條件變量,即將等待隊列和睡眠條件包裝在一塊兒,就造成了一種新的同步機制,稱爲條件變量。一個條件變量 CV 可理解爲一個進程的等待隊列,隊列中的進程正等待某個條件C變爲真。每一個條件變量關聯着一個斷言 "斷言" PC。當一個進程等待一個條件變量,該進程不算做佔用了該管程,於是其它進程能夠進入該管程執行,改變管程的狀態,通知條件變量 CV 其關聯的斷言 PC 在當前狀態下爲真。
於是條件變量兩種操做以下:
wait_cv
: 被一個進程調用,以等待斷言 PC 被知足後該進程可恢復執行。進程掛在該條件變量上等待時,不被認爲是佔用了管程。若是條件不能知足,就須要等待。signal_cv
:被一個進程調用,以指出斷言 PC 如今爲真,從而能夠喚醒等待斷言 PC 被知足的進程繼續執行。若是條件能夠知足,那麼能夠運行。在 ucore 中,管程數據結構被定義在(kern/sync/monitor.h)中:
// 管程數據結構 typedef struct monitor{ // 二值信號量,用來互斥訪問管程,只容許一個進程進入管程,初始化爲 1 semaphore_t mutex; // 二值信號量 用來互斥訪問管程 //用於進程同步操做的信號量 semaphore_t next;// 用於條件同步(進程同步操做的信號量),發出 signal 操做的進程等條件爲真以前進入睡眠 // 睡眠的進程數量 int next_count;// 記錄睡在 signal 操做的進程數 // 條件變量cv condvar_t *cv;// 條件變量 } monitor_t;
管程中的成員變量 mutex 是一個二值信號量,是實現每次只容許一個進程進入管程的關鍵元素,確保了互斥訪問性質。
管程中的條件變量 cv 經過執行 wait_cv,會使得等待某個條件 C 爲真的進程可以離開管程並睡眠,且讓其餘進程進入管程繼續執行;而進入管程的某進程設置條件 C 爲真並執行 signal_cv 時,可以讓等待某個條件 C 爲真的睡眠進程被喚醒,從而繼續進入管程中執行。
管程中的成員變量信號量 next 和整形變量 next_count 是配合進程對條件變量 cv 的操做而設置的,這是因爲發出signal_cv 的進程 A 會喚醒睡眠進程 B,進程 B 執行會致使進程 A 睡眠,直到進程 B 離開管程,進程 A 才能繼續執行,這個同步過程是經過信號量 next 完成的;
而 next_count 表示了因爲發出 singal_cv 而睡眠的進程個數。
其中,條件變量 cv 的數據結構也被定義在同一個位置下:
// 條件變量數據結構 typedef struct condvar{ // 用於條件同步 用於發出 wait 操做的進程等待條件爲真以前進入睡眠 semaphore_t sem; //用於發出 wait_cv 操做的等待某個條件 C 爲真的進程睡眠 // 記錄睡在 wait 操做的進程數(等待條件變量成真) int count; //在這個條件變量上的睡眠進程的個數 // 所屬管程 monitor_t * owner; //此條件變量的宿主管程 } condvar_t;
條件變量的定義中也包含了一系列的成員變量,信號量 sem 用於讓發出 wait_cv 操做的等待某個條件 C 爲真的進程睡眠,而讓發出 signal_cv 操做的進程經過這個 sem 來喚醒睡眠的進程。count 表示等在這個條件變量上的睡眠進程的個數。owner 表示此條件變量的宿主是哪一個管程。
其實原本條件變量中須要有等待隊列的成員,以表示有多少線程由於當前條件得不到知足而等待,但這裏,直接採用了信號量替代,由於信號量數據結構中也含有等待隊列。
咱們對管程進行初始化操做:
// 初始化管程 void monitor_init (monitor_t * mtp, size_t num_cv) { int i; assert(num_cv>0); mtp->next_count = 0; // 睡在 signal 進程數 初始化爲 0 mtp->cv = NULL; sem_init(&(mtp->mutex), 1); // 二值信號量 保護管程 使進程訪問管程操做爲互斥的 sem_init(&(mtp->next), 0); // 條件同步信號量 mtp->cv =(condvar_t *) kmalloc(sizeof(condvar_t)*num_cv); // 獲取一塊內核空間 放置條件變量 assert(mtp->cv!=NULL); for(i=0; i<num_cv; i++){ mtp->cv[i].count=0; sem_init(&(mtp->cv[i].sem),0); mtp->cv[i].owner=mtp; } }
那麼如今開始解決哲學家就餐問題,使用管程,它的實如今(kern/sync/check_sync,199+行)
monitor_init(&mt, N); //初始化管程 for(i=0;i<N;i++){ state_condvar[i]=THINKING; int pid = kernel_thread(philosopher_using_condvar, (void *)i, 0); if (pid <= 0) { panic("create No.%d philosopher_using_condvar failed.\n"); } philosopher_proc_condvar[i] = find_proc(pid); set_proc_name(philosopher_proc_condvar[i], "philosopher_condvar_proc"); }
咱們發現,這個實現過程和使用信號量無差異,不一樣之處在於,各個線程所執行的函數不一樣,此處執行的爲 philosopher_using_condvar 函數:
philosopher_using_condvar 函數被定義在(kern/sync/check_sync,162——180行)
int philosopher_using_condvar(void * arg) { /* arg is the No. of philosopher 0~N-1*/ int i, iter=0; i=(int)arg; cprintf("I am No.%d philosopher_condvar\n",i); while(iter++<TIMES) { /* iterate*/ cprintf("Iter %d, No.%d philosopher_condvar is thinking\n",iter,i); /* thinking*/ do_sleep(SLEEP_TIME); phi_take_forks_condvar(i); /* need two forks, maybe blocked */ cprintf("Iter %d, No.%d philosopher_condvar is eating\n",iter,i); /* eating*/ do_sleep(SLEEP_TIME); phi_put_forks_condvar(i); /* return two forks back*/ } cprintf("No.%d philosopher_condvar quit\n",i); return 0; }
咱們發現這裏和用信號量仍是沒有本質的差異,不一樣之處在於,獲取筷子和放下都使用了不一樣的,配套管程使用的函數 phi_take_forks_condvar 和 phi_put_forks_condvar。
phi_take_forks_condvar 和 phi_put_forks_condvar 被定義在(kern/sync/check_sync,121——159行)
其中,mtp 爲一個管程,聲明於同一文件下的第 108 行,state_convader 數組記錄哲學家的狀態,聲明於第107行。
// 拿刀叉 /* * phi_take_forks_condvar() 函數實現思路: 1. 獲取管程的鎖 2. 將本身設置爲飢餓狀態 3. 判斷當前叉子是否足夠就餐,如不能,等待其餘人釋放資源 4. 釋放管程的鎖 */ void phi_take_forks_condvar(int i) { down(&(mtp->mutex)); //保證互斥操做,P 操做進入臨界區 //--------into routine in monitor-------------- // LAB7 EXERCISE1: YOUR CODE // I am hungry // try to get fork // I am hungry state_condvar[i]=HUNGRY; // 飢餓狀態,準備進食 // try to get fork phi_test_condvar(i); //測試哲學家是否能拿到刀叉,若不能拿,則阻塞本身,等其它進程喚醒 if (state_condvar[i] != EATING) { //沒拿到,須要等待,調用 wait 函數 cprintf("phi_take_forks_condvar: %d didn't get fork and will wait\n",i); cond_wait(&mtp->cv[i]); } //--------leave routine in monitor-------------- if(mtp->next_count>0) up(&(mtp->next)); else up(&(mtp->mutex)); }
這個地方的意思是,若是當前管程的等待數量在喚醒了一個線程以後,還有進程在等待,那麼就會喚醒控制當前進程的信號量,讓其餘進程佔有它,若是沒有等待的了,那麼直接釋放互斥鎖,這樣就能夠容許新的進程進入管程了。
// 放刀叉 /* * phi_put_forks_condvar() 函數實現思路: 1. 獲取管程的鎖 2. 將本身設置爲思考狀態 3. 判斷左右鄰居的哲學家是否能夠從等待就餐的狀態中恢復過來 */ void phi_put_forks_condvar(int i) { down(&(mtp->mutex));// P 操做進入臨界區 //--------into routine in monitor-------------- // LAB7 EXERCISE1: YOUR CODE // I ate over // test left and right neighbors // I ate over state_condvar[i]=THINKING;// 思考狀態 // test left and right neighbors // 試試左右兩邊可否得到刀叉 phi_test_condvar(LEFT); phi_test_condvar(RIGHT); //喚醒左右哲學家,試試看他們能不能開始吃 //--------leave routine in monitor-------------- if(mtp->next_count>0)// 有哲學家睡在 signal 操做,則將其喚醒 up(&(mtp->next)); else up(&(mtp->mutex));//離開臨界區 }
和信號量的實現差很少,咱們在拿起筷子和放下的時候,主要都還要喚醒相鄰位置上的哲學家,可是,具體的test操做中,實現有所不一樣。test 函數被定義在(同文件,110——118行)
// 測試編號爲i的哲學家是否能得到刀叉 若是能得到 則將狀態改成正在吃 而且 嘗試喚醒 由於wait操做睡眠的進程 // cond_signal 還會阻塞本身 等被喚醒的進程喚醒本身 void phi_test_condvar (i) { if(state_condvar[i]==HUNGRY&&state_condvar[LEFT]!=EATING &&state_condvar[RIGHT]!=EATING) { cprintf("phi_test_condvar: state_condvar[%d] will eating\n",i); state_condvar[i] = EATING ; cprintf("phi_test_condvar: signal self_cv[%d] \n",i); cond_signal(&mtp->cv[i]); //若是能夠喚醒,那麼signal操做掉表明這個哲學家那個已經睡眠等待的進程。和wait是對應的。 } }
上述這一過程能夠被描述爲以下的流程圖:
哲學家->試試拿刀叉->能拿->signal 喚醒被wait阻塞的進程->阻塞本身 | | A | V | ->不能拿->wait阻塞本身 | | 哲學家->放刀叉->讓左右兩邊試試拿刀叉->有哲學家睡在signal 喚醒他
如今看來,最主要的部分在於管程的 signal 和 wait 操做,ucore 操做系統中對於 signal 和 wait 操做的實現是有專門的函數的,它們是 cond_signal 和 cond_wait(kern/sync/monitor.c,26——72行,代碼實現部分)
// 管程signal操做 /* 分支1. 由於條件不成立而睡眠的進程計數小於等於0 時 說明 沒有進程須要喚醒 則直接返回 分支2. 由於條件不成立而睡眠的進程計數大於0 說明有進程須要喚醒 就將其喚醒 同時設置 條件變量所屬管程的 next_count 加1 以用來告訴 wait操做 有進程睡在了 signal操做上 而後本身將本身阻塞 等待條件同步 被喚醒 被喚醒後 睡在 signal 操做上的進程應該減小 故 next_count 應減 1 */ void cond_signal (condvar_t *cvp) { //LAB7 EXERCISE1: YOUR CODE cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); //這是一個輸出信息的語句,能夠無論 /* * cond_signal() 函數實現思路: 1. 判斷條件變量的等待隊列是否爲空 2. 修改 next 變量上等待進程計數,跟下一個語句不能交換位置,爲了獲得互斥訪問的效果,關鍵在於訪問共享變量的時候,管程中是否只有一個進程處於 RUNNABLE 的狀態 3. 喚醒等待隊列中的某一個進程 4. 把本身等待在 next 條件變量上 5. 當前進程被喚醒,恢復 next 上的等待進程計數 */ if(cvp->count>0) { cvp->owner->next_count ++; //管程中睡眠的數量 up(&(cvp->sem)); //喚醒在條件變量裏睡眠的進程 down(&(cvp->owner->next)); //將在管程中的進程睡眠 cvp->owner->next_count --; } cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); }
首先判斷 cvp.count,若是不大於 0,則表示當前沒有睡眠在這一個條件變量上的進程,所以就沒有被喚醒的對象了,直接函數返回便可,什麼也不須要操做。
若是大於 0,這表示當前有睡眠在該條件變量上的進程,所以須要喚醒等待在cv.sem上睡眠的進程。而因爲只容許一個進程在管程中執行,因此一旦進程 B 喚醒了別人(進程A),那麼本身就須要睡眠。故讓 monitor.next_count 加一,且讓本身(進程B)睡在信號量 monitor.next(宿主管程的信號量)上。若是睡醒了,這讓 monitor.next_count 減一。
這裏爲何最後要加一個 next_conut--
呢?這說明上一句中的 down 的進程睡醒了,那麼睡醒,就必然是另一個進程喚醒了它,由於只能有一個進程在管程中被 signal,若是有進程調用了 wait,那麼必然須要 signal 另一個進程,咱們能夠從下圖能夠看到這一調用過程:
咱們來看 wait 函數:
// 管程wait操做 /* 先將 由於條件不成立而睡眠的進程計數加1 分支1. 當 管程的 next_count 大於 0 說明 有進程睡在了 signal 操做上 咱們將其喚醒 分支2. 當 管程的 next_count 小於 0 說明 當前沒有進程睡在 signal 操做數 只須要釋放互斥體 而後 再將 自身阻塞 等待 條件變量的條件爲真 被喚醒後 將條件不成立而睡眠的進程計數減1 由於如今成立了 */ void cond_wait (condvar_t *cvp) { //LAB7 EXERCISE1: YOUR CODE cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); /* * cond_wait() 函數實現思路: 1. 修改等待在條件變量的等待隊列上的進程計數 2. 釋放鎖 3. 將本身等待在條件變量上 4. 被喚醒,修正等待隊列上的進程計數 */ cvp->count++; //條件變量中睡眠的進程數量加 1 if(cvp->owner->next_count > 0) up(&(cvp->owner->next)); //若是當前有進程正在等待,且睡在宿主管程的信號量上,此時須要喚醒,讓該調用了 wait 的睡,此時就喚醒了,對應上面討論的狀況。這是一個同步問題。 else up(&(cvp->owner->mutex)); //若是沒有進程睡眠,那麼當前進程沒法進入管程的緣由就是互斥條件的限制。所以喚醒 mutex 互斥鎖,表明如今互斥鎖被佔用,此時,再讓進程睡在宿主管程的信號量上,若是睡醒了,count--,誰喚醒的呢?就是前面的 signal 啦,這實際上是一個對應關係。 down(&(cvp->sem)); //由於條件不知足,因此主動調用 wait 的進程,會睡在條件變量 cvp 的信號量上,是條件不知足的問題;而由於調用 signal 喚醒其餘進程而致使自身互斥睡眠,會睡在宿主管程 cvp->owner 的信號量上,是同步的問題。兩個有區別,不要混了,超級重要鴨!!! cvp->count --; cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count); }
若是進程 A 執行了 cond_wait 函數,表示此進程等待某個條件 C 不爲真,須要睡眠。所以表示等待此條件的睡眠進程個數 cv.count 要加一。接下來會出現兩種狀況。
狀況一:若是 monitor.next_count 若是大於 0,表示有大於等於 1 個進程執行 cond_signal 函數且睡着了,就睡在了 monitor.next 信號量上。假定這些進程造成 S 進程鏈表。所以須要喚醒 S 進程鏈表中的一個進程 B。而後進程 A 睡在 cv.sem 上,若是睡醒了,則讓 cv.count 減一,表示等待此條件的睡眠進程個數少了一個,可繼續執行。
狀況二:若是 monitor.next_count 若是小於等於 0,表示目前沒有進程執行 cond_signal 函數且睡着了,那須要喚醒的是因爲互斥條件限制而沒法進入管程的進程,因此要喚醒睡在 monitor.mutex 上的進程。而後進程 A 睡在 cv.sem 上,若是睡醒了,則讓 cv.count 減一,表示等待此條件的睡眠進程個數少了一個,可繼續執行了!
關於條件變量機制的實現主要位於 monitor.c 文件中的 cond_signal, cond_wait
兩個函數中,這兩個函數的含義分別表示提醒等待在這個條件變量上的進程恢復執行,以及等待在這個條件變量上,直到有其餘進行將其喚醒位置,所以對上述這些函數分析總結以下:
cond_signal:將指定條件變量上等待隊列中的一個線程進行喚醒,而且將控制權轉交給這個進程;具體執行流程爲:
cond_wait:該函數的功能爲將當前進程等待在指定信號量上,其操做過程爲將等待隊列的計數加1,而後釋放管程的鎖或者喚醒一個next上的進程來釋放鎖(不然會形成管程被鎖死沒法繼續訪問,同時這個操做不能和前面的等待隊列計數加1的操做互換順序,要不不能保證共享變量訪問的互斥性),而後把本身等在條件變量的等待隊列上,直到有signal信號將其喚醒,正常退出函數;
關於使用條件變量來完成哲學家就餐問題的實現中,總共有兩個關鍵函數,以及使用到了 N(哲學家數量)個條件變量,在管程中,還包括了一個限制管程訪問的鎖還有 N 個用於描述哲學家狀態的變量(總共有 EATING, THINKING, HUNGER)三種狀態;
首先分析 phi_take_forks_condvar 函數的實現,該函數表示指定的哲學家嘗試得到本身所須要進餐的兩把叉子,若是不能得到則阻塞,具體實現流程爲:
而 phi_put_forks_condvar 函數則是釋放當前哲學家佔用的叉子,而且喚醒相鄰的由於得不到資源而進入等待的哲學家:
請在實驗報告中給出給用戶態進程/線程提供條件變量機制的設計方案,並比較說明給內核級 提供條件變量機制的異同。
發如今本實驗中管程的實現中互斥訪問的保證是徹底基於信號量的,也就是若是按照上文中的說明使用 syscall 實現了用戶態的信號量的實現機制,那麼就徹底能夠按照相同的邏輯在用戶態實現管程機制和條件變量機制;
固然也能夠仿照用戶態實現條件變量的方式,將對訪問管程的操做封裝成系統調用;
異同點爲:
請在實驗報告中回答:可否不用基於信號量機制來完成條件變量?若是不能,請給出理由, 若是能,請給出設計說明和具體實現。
可以基於信號量來完成條件變量機制;事實上在本實驗中就是這麼完成的,只須要將使用信號量來實現條件變量和管程中使用的鎖和等待隊列便可。
最終的實驗結果以下圖所示:
若是 make grade 沒法滿分,嘗試註釋掉 tools/grade.sh 的 221 行到 233 行(在前面加上「#」)。
這裏咱們選用古老的編輯器 Vim,具體操做過程以下:
:221
跳轉至 221 行;待完成。。。
待完成。。。