進程間通訊

v2-3b32471e938dae81bfd4d93bf9add3d7_1200x500

在本系統中發現了兩個BUG:兩個BUG · Issue #3 · bajdcc/MiniOS,限於本身水平比較渣,無法解決這兩個BUG,那麼OS系列就告一段落了,縱觀整個過程,仍是對IPC機制的理解幫助比較大,這種思想能夠運用於實踐中,如管道、套接字、流的實現。java

---------------------------------------------------------------------------linux

有童鞋問如何從零開始學習代碼,我寫了點總結:git

首先要了解操做系統的知識,從零開始指的是代碼從零開始,因此相關知識的基礎也是不可或缺的。要了解系統的啓動過程,Zyw_OS啓動流程跟實現~未完待續 - 知乎專欄 。從啓動過程來分析代碼,首先是BIOS。github

(1)而後是boot文件夾下的三個asm文件,它們完成讀軟盤、設置GDT、加載內核數據至0x100000,並跳轉到0x100000處執行內核代碼web

(2)隨後根據main.c中初始化的順序學習:(I)vga顯示輸出、(II)gdt分段管理、(III)idt中斷向量設置、(IV)isr中斷處理程序、(V)pmm獲取可用內存並按4KB記錄並保存至棧中,完成物理頁框的申請與釋放、(VI)vmm分頁管理、(VII)sys初始化系統調用函數、(VIII)proc多進程管理,進程切換。編程

(3)從零開始添加功能,如先寫好boot.asm,可以開機並用中斷打印一個A,沒問題以後,再設置gdt,vga以及中斷管理,這時運行一個int 3看看有沒有打印結果出來;這樣每次寫的代碼都可以獨立運行。若是是把全部代碼下載下來運行的話,效果很不錯,可是會不知道從讀起。如今要作的就是精簡代碼,只保留核心功能,摒棄一切雜念。c#

(4)那麼寫代碼過程當中有幾個狀況要處理:(I)不明因此,這時能夠看註釋或是上網查資料(II)這裏代碼是否是有問題?先放着無論(III)運行出錯了,那就debug吧。其中debug的時間比較久,我作到IPC這一部分起碼編程時間100小時以上,不包括平常吃飯時想的時間,因此不要慌,操做系統代碼須要細嚼慢嚥,急也急不得。debug可能用時比較久,這時比較糾結、想放棄、頭腦混亂,由於好多bug真不知道哪冒出來的,用qemu源碼級調試好處也有限。其實這一關過不了,對操做系統的理解水平也就觸到天花板了,也就是隻是理解了書上的思想,而沒有將代碼和思想結合起來。寫代碼、山寨別人的代碼、東拼西湊,無論用什麼方式,只要把bug除了就皆大歡喜。windows

(5)這樣作有好處:代碼、思路,全部的細節所有load到了腦子裏,要啥有啥,也就是真正理解了內核,能夠觸類旁通,並本身更改代碼,添加功能服務器

(6)要有毅力、有恆心,能吃苦,成功不是一蹴而就,別看我寫的代碼運行效果挺好,我起碼debug了100h以上,天天打底調試6小時,最後才能bug弄好。真正本身花時間花工夫寫的代碼,纔會長久的留本身的腦海裏。架構

--------------------------------------------------------------------

寫在前面

Release:bajdcc/MiniOS

總結下當前的進度:

  1. 引導、GDT、中斷、虛頁:花了兩三天
  2. 多進程:花了一週
  3. 進程間通訊:花了兩三天

90%的時間花在了debug上,除完全部bug,已經對整個實現機制瞭如指掌。因此,debug的過程也是一個學習的過程,雖然效率不高,可是效果好(就是單詞抄寫百遍的效果啦)。

90%操做系統的內容 = 枯燥乏味,而10%(餘下的內容) = 精妙絕倫

目前感覺設計精妙的地方:

  1. 進程切換機制(時鐘分派),進程的父子關係,進程的狀態。表明:fork,wait,sleep,exit,kill,如wait和exit的搭配爲例,進程的銷燬是惰性的(銷燬操做集中於父進程中的wait)。
  2. 進程間通訊機制。通訊分異步和同步,那麼這裏實現的是同步IPC,比較簡單,用不着寫隊列。這是微內核的基礎,它將某些同類別的系統調用轉化爲惟一一個系統調用0x80。如何區分不一樣功能的調用呢?就是在調用int 0x80前將參數入棧,參數有通訊方式(SEND/RECEIVE),通訊對象(正數表示pid,-1表示任意對象),消息結構

這些精妙的地方只能經過代碼去體會。

用戶代碼

static void halt() {
    while (1);
}

static int delay() {
    volatile int i;

    for (i = 0; i < 0x1000000; i++);
    return i;
}

void user_main() {

    int i;

    i = call(SYS_FORK);

    delay();
    if (i != 0) {
        sys_tasks0();
        delay();
        call(SYS_WAIT);
    } else {
        while (1) {
            delay();
            i = sys_ticks();
            delay();
            printk("!! proc#%d received tick '%d'\n", proc2pid(proc), i);
            delay();
            delay();
            delay();
            delay();
        }
    }

    printk("halt...\n");

    halt();
}

void sys_tasks0() {
    extern uint32_t tick;

    MESSAGE msg;
    while (1) {
        send_recv(RECEIVE, TASK_ANY, &msg);
        int src = msg.source;
        switch (msg.type) {
        case SYS_TICKS:
            msg.RETVAL = tick;
            printk("!! proc #%d sent tick '%d'\n", proc2pid(proc), tick);
            send_recv(SEND, src, &msg);
            break;
        default:
            assert(!"unknown msg type");
            break;
        }
    }
}

static int sys_ipc_call(int type) {
    MESSAGE msg;
    reset_msg(&msg);
    msg.type = type;
    send_recv(BOTH, TASK_SYS, &msg);
    return msg.RETVAL;
}

int sys_ticks() {
    return sys_ipc_call(SYS_TICKS);
}

進程的結構

說到IPC,可能會想到pipe、clipboard、windows message、socket、shared memory、file等方式,然而無法實現那麼多(括弧笑,因此跟着書上走吧~

目前只是抄了書上的代碼(但願儘快看到結果),還沒時間去分析IPC機制的代碼。

首先,咱們建立的進程proc的結構有:

struct proc {
    /* KERNEL */
    struct interrupt_frame *fi;     // 中斷現場
    volatile uint8_t pid;           // 進程ID
    uint32_t size;                  // 用戶空間大小
    uint8_t state;                  // 進程狀態
    char name[PN_MAX_LEN];          // 進程名稱
    pde_t *pgdir;                   // 虛頁目錄(一級頁表)
    char *stack;                    // 進程內核堆棧
    struct proc *parent;            // 父進程
    int8_t ticks;                   // 時間片
    int8_t priority;                // 優先級
    /* IPC */
    int p_flags;                    // 標識
    MESSAGE *p_msg;                 // 消息
    int p_recvfrom;                 // 接收消息的進程ID
    int p_sendto;                   // 發送消息的進程ID
    int has_int_msg;                // nonzero if an INTERRUPT occurred when the task is not ready to deal with it.
    struct proc *q_sending;         // queue of procs sending messages to this proc
    struct proc *next_sending;      // next proc in the sending queue (q_sending)
};

結構很複雜吧?不過,若是是一步步實現功能,往裏添加的話,其實也不算多。

PCB結構:

  1. 進程相關信息:名稱,ID,狀態,父進程
  2. 調度信息:中斷現場,時間片,優先級
  3. 內存信息:代碼空間大小,頁表,內核堆棧
  4. IPC:消息收發狀態flags,消息msg,收發進程ID,消息隊列(鏈表)

進程的切換:

  1. 主進程死循環,經過時鐘中斷,進行調度
  2. 中斷時保存現場(即proc->fi),若是是最外層中斷,那麼起調度做用(此時k_reenter=0),內層中斷k_reenter>0,中斷結束後iret返回,從proc->fi中恢復現場,此時修改相應特權級

微內核架構

原版的linux中有一堆的系統調用,那麼微內核架構與此不一樣,它將系統調用按功能劃分開來,如分紅內存管理、文件管理等,創建專門的系統級進程來負責系統調用。

那麼,也就是 「ring1級系統服務進程」 與 系統 打交道(經過系統調用),而咱們的「ring3級用戶進程」 只要與 「ring1級系統服務進程」 通訊就能夠了。結論:ring3用戶級 <=> ring1服務級 <=> ring0系統級,ring1就像中介同樣,而ring0與ring3能夠素不相識。這樣,微內核架構(至關於微服務)抽象出一個服務層sys_task,下降了耦合度。

  • ring1與ring0打交道:經過系統調用便可
  • ring1與ring3打交道:維護一個消息等待隊列

進程間通訊

主要分兩個函數msg_send和msg_receive。

收/發消息有幾種狀況:

  1. 系統服務監聽消息:沒消息時休眠,來消息時喚醒
  2. 系統服務發送消息:僅當系統服務收到用戶的SEND消息後,被喚醒,隨後用戶再發送RECV消息,系統服務收到後設置msg
  3. 用戶進程發送消息:系統服務不可用,用戶進程堵塞,直到系統服務處理完其餘任務,從等待隊列中取出用戶進程,並喚醒用戶進程
  4. 用戶進程接收消息:當用戶進程發送SEND消息收到迴應後,會再發送一個RECV消息,等待系統服務響應並設置msg,最後用戶進程拿到設置後的msg

這裏的操做挺像TCP的握手操做的,概括起來的同步通訊模型

  1. 系統服務:監聽消息(就像web服務器同樣),單線程: while(1){recv(), send()}
  2. 用戶進程:像web客戶端同樣,單線程:{send() recv()}

消息隊列

統一調用接口:

int send_recv(int function, int src_dest, MESSAGE* msg)
{
    int ret = 0, caller;

    caller = proc2pid(proc);

    if (function == RECEIVE)
        memset(msg, 0, sizeof(MESSAGE));

    switch (function) {
    case BOTH: // 先發送再接收
        ret = _sendrec(SEND, src_dest, msg, caller);
        if (ret == 0)
            ret = _sendrec(RECEIVE, src_dest, msg, caller);
        break;
    case SEND:
    case RECEIVE:
        ret = _sendrec(function, src_dest, msg, caller);
        break;
    default:
        assert((function == BOTH) ||
               (function == SEND) || (function == RECEIVE));
        break;
    }

    return ret;
}

對於系統服務service:

  1. send_recv(RECV, TASK_ANY, msg) 監聽消息
  2. 處理msg
  3. send_recv(RECV, msg.source, msg) 發送消息給客戶端

對於客戶端程序client:

  1. 初始化msg
  2. send_recv(SEND, SYSTASK_ID, msg) 發送消息給系統服務
  3. send_recv(RECV, SYSTASK_ID, msg) 堵塞並接收消息
  4. 處理msg

1、發送消息

int msg_send(struct proc* current, int dest, MESSAGE* m)
{
    struct proc* sender = current;
    struct proc* p_dest = npid(dest); /* proc dest */

    /* check for deadlock here */
    if (deadlock(proc2pid(sender), dest)) {
        printk("DEADLOCK! %d --> %d\n", sender->pid, p_dest->pid);
        assert(!"DEADLOCK");
    }

    if ((p_dest->p_flags & RECEIVING) && /* dest is waiting for the msg */
        (p_dest->p_recvfrom == proc2pid(sender) ||
         p_dest->p_recvfrom == TASK_ANY)) {

        memcpy(va2la(dest, p_dest->p_msg),
              va2la(proc2pid(sender), m),
              sizeof(MESSAGE));

        p_dest->p_msg = 0;
        p_dest->p_flags &= ~RECEIVING; /* dest has received the msg */
        p_dest->p_recvfrom = TASK_NONE;
        unblock(p_dest);
    }
    else { /* dest is not waiting for the msg */
        sender->p_flags |= SENDING;
        sender->p_sendto = dest;
        sender->p_msg = m;

        /* append to the sending queue */
        struct proc * p;
        if (p_dest->q_sending) {
            p = p_dest->q_sending;
            while (p->next_sending)
                p = p->next_sending;
            p->next_sending = sender;
        }
        else {
            p_dest->q_sending = sender;
        }
        sender->next_sending = 0;

        block(sender);
    }

    return 0;
}

解釋:

  1. 判斷是否死鎖,即A->send->B,同時B->send->A
  2. 若對方正在監聽消息,則將msg拷貝到對方的p_msg中,並消除對方的監聽與堵塞狀態
  3. 若對方不在監聽消息(可能在處理其餘事務),則將發送方PCB指針插入到對方的q_sending隊列中,並將發送方堵塞以等待接收方的迴應

2、接收消息

int msg_receive(struct proc* current, int src, MESSAGE* m)
{
    struct proc* p_who_wanna_recv = current;
    struct proc* p_from = 0; /* from which the message will be fetched */
    struct proc* prev = 0;
    int copyok = 0;

    if ((p_who_wanna_recv->has_int_msg) &&
        ((src == TASK_ANY) || (src == INTERRUPT))) {
        /* There is an interrupt needs p_who_wanna_recv's handling and
         * p_who_wanna_recv is ready to handle it.
         */

        MESSAGE msg;
        reset_msg(&msg);
        msg.source = INTERRUPT;
        msg.type = HARD_INT;
        assert(m);
        memcpy(va2la(proc2pid(p_who_wanna_recv), m), &msg,
              sizeof(MESSAGE));

        p_who_wanna_recv->has_int_msg = 0;

        return 0;
    }


    /* Arrives here if no interrupt for p_who_wanna_recv. */
    if (src == TASK_ANY) {
        /* p_who_wanna_recv is ready to receive messages from
         * TASK_ANY proc, we'll check the sending queue and pick the
         * first proc in it.
         */
        if (p_who_wanna_recv->q_sending) {
            p_from = p_who_wanna_recv->q_sending;
            copyok = 1;
        }
    }
    else {
        /* p_who_wanna_recv wants to receive a message from
         * a certain proc: src.
         */
        p_from = npid(src);

        if ((p_from->p_flags & SENDING) &&
            (p_from->p_sendto == proc2pid(p_who_wanna_recv))) {
            /* Perfect, src is sending a message to
             * p_who_wanna_recv.
             */
            copyok = 1;

            struct proc* p = p_who_wanna_recv->q_sending;
            while (p) {
                assert(p_from->p_flags & SENDING);
                if (proc2pid(p) == proc2pid(npid(src))) { /* if p is the one */
                    p_from = p;
                    break;
                }
                prev = p;
                p = p->next_sending;
            }

        }
    }

    if (copyok) {
        /* It's determined from which proc the message will
         * be copied. Note that this proc must have been
         * waiting for this moment in the queue, so we should
         * remove it from the queue.
         */
        if (p_from == p_who_wanna_recv->q_sending) { /* the 1st one */
            assert(prev == 0);
            p_who_wanna_recv->q_sending = p_from->next_sending;
            p_from->next_sending = 0;
        }
        else {
            prev->next_sending = p_from->next_sending;
            p_from->next_sending = 0;
        }

        /* copy the message */
        memcpy(va2la(proc2pid(p_who_wanna_recv), m),
              va2la(proc2pid(p_from), p_from->p_msg),
              sizeof(MESSAGE));

        p_from->p_msg = 0;
        p_from->p_sendto = TASK_NONE;
        p_from->p_flags &= ~SENDING;
        unblock(p_from);
    }
    else {  /* nobody's sending TASK_ANY msg */
        /* Set p_flags so that p_who_wanna_recv will not
         * be scheduled until it is unblocked.
         */
        p_who_wanna_recv->p_flags |= RECEIVING;

        p_who_wanna_recv->p_msg = m;

        if (src == TASK_ANY)
            p_who_wanna_recv->p_recvfrom = TASK_ANY;
        else
            p_who_wanna_recv->p_recvfrom = proc2pid(p_from);

        block(p_who_wanna_recv);

    }

    return 0;
}

解釋:

  1. 若接收方發生中斷,則處理中斷,函數當即返回
  2. 若接收方能夠接收一切消息TASK_ANY,那麼此時判斷q_sending發送隊列中是否有消息,是的話,則從隊列中取消息,清除發送方的SENDING狀態;若是此時q_sending中沒有消息,則接收方堵塞,置RECEIVING狀態
  3. 若接收方只接收某一種消息,則當消息不匹配時,接收方堵塞;若消息匹配,進行第2步中的取消息操做

死鎖的簡單判斷:

因爲q_sending隊列表示等待隊列,只要遍歷它,看是否能夠遍歷到當前進程自己便可。

堵塞的簡單實現:

堵塞意味着要暫停當前進程並切換到其餘進程,然而本系統的實現有限,只能強行觸發時鐘中斷進行進程切換,由此可能致使BUG。

階段性總結

若是說debug是負反饋,那麼proc和ipc的實現就是大大的正反饋,先前用java實現瞭解釋器並構建操做系統(bajdcc/jMiniLang),提供lambda、coroutine、multi-process等機制,但效率極低,求個一百內素數都要半天,仍是無法完成作一個操做系統的願望。原本用C/C++/ java/C# 也造了好多好多輪子,那麼此次實現操做系統只用到了ASM和C,可是!!!難度非同小可!由於:資料貧乏、機制複雜、陷阱衆多、難以調試、理解困難等等……但我沒有放棄!!但看來IPC運行良好沒有panic的時候,個人心裏是很是喜悅的!這大概就是編程的美吧!

https://zhuanlan.zhihu.com/p/26054925備份。

相關文章
相關標籤/搜索