一種基於優先權的低層次搶佔式調度器源碼分析

前言

調度器(scheduler)是計算機操做系統內核中對進程/線程分配CPU計算資源的重要模塊react

調度器功能

在桌面機,嵌入式設備或大型機等不一樣的應用環境中,產生了不一樣類型資源分配策略的調度器,針對不一樣層面分爲如下幾種調度器算法

  • 高級調度器(long-term scheduler): 一般用於面向批處理的多程序環境中,經過均衡內存中的任務以優化系統資源,包括CPU,內存和硬盤存儲等。
  • 中級調度器(medium-term scheduler): 跟蹤CPU上執行的進程/線程的動態內存用量來決定是否要增長或減小「多通道程度「(內存中競爭CPU的進程個數),以防止」顛簸「。顛簸即當前的進程集合的內存需求超過了系統容量,致使進程在各自的執行中變得緩慢。
  • 低級調度器(short-term scheduler): 負責在當前駐留的內存裏的進程/線程中選擇一個來運行。這篇內容設計的算法主要關注的都是低級調度器

在以前的項目工做中研究了一種部署在Xilinx MicroBlaze 嵌入式平臺中的低級調度器,計劃按如下四個部分介紹該調度器的具體實現和改進實驗編程

  1. 調度器源碼分析
  2. 基於VEGA織女星NXP-RVM1-Series RI5CY平臺的調度器移植實驗
  3. 基於中斷時間片的調度器改進實驗
  4. 調度器性能分析

本篇文章爲第一部分將分析該調度器的源碼實現。 在低級調度器中,通常劃分爲:非搶佔式搶佔式。非搶佔式的調度器中,一個進程/線程或採用執行到底策略,或主動釋放資源放棄佔用處理器時間以處理I/O請求,調度器只負責安排進程/線程執行順序;而搶佔式調度器則從主動從當前線程手中把處理器計算資源搶走,並交給另外一個進程/線程使用。無論是哪一類調度器都會按照如下四項開展調度工做:數組

  • 得到處理器的控制權
  • 保存當前正在運行的進程/線程狀態(PCB,process control block)
  • 選擇一個新的進程/線程來執行
  • 提交新選擇出來的進程/線程給處理器運行,將該進程/線程的PCB狀態加載到CPU寄存器中

進程/線程狀態包含了程序執行中的關鍵信息,例如當前程序執行位置(PC指針),過程調用(sub-routine)返回地址,程序在存儲空間的現場(context,程序上下文)等等,是調度器執行調度任務的重要信息部分。緩存

源碼分析

基本結構

調度器由如下6個基本類組成安全

類名 功能說明
thread_dt 裝載任務線程thread的運行狀態(PCB)
threadext_dt 指定任務線程thread的棧(stack)空間,管理線程函數的函數指針及參數
contexqueue 管理PCB線程池的單向鏈表隊列
thread_lib 調度器的核心類,包含調度器線程管理的全部方法和線程資源池
event_dt 線程事件的實現類,包含事件的實現方法及操做函數
semaphore_dt 信號量旗語的實現類,包含信號量的實現及操做方法

class thread_lib是整個調度器的核心類,下圖爲整個調度器的組織結構bash

調度器結構關係

代碼分析

  • thread_dt類

class thread_dt是上文所提到的PCB內容的容器,實現以下:多線程

class thread_dt
{
public:
#ifdef WIN32
    LPVOID thread_fiber;
#else
    uint32 sp;             //棧指針
    union
    {
        uint32 reg[15];    //部分CPU通用寄存器保存
        struct
        {
            uint32 r15;    //sub-routine結束後的返回地址, 在microblaze體系中,r15用於保存返回地址
            uint32 r17;
            uint32 r19;
            uint32 r20;
            uint32 r21;
            uint32 r22;
            uint32 r23;
            uint32 r24;
            uint32 r25;
            uint32 r26;
            uint32 r27;
            uint32 r28;
            uint32 r29;
            uint32 r30;
            uint32 r31;
        };
    };
#endif
    threadext_dt *extdat;    //線程任務的操做類指針
    thread_dt *next;         //線程池鏈表的尾指針
       uint32 priority;         //任務的優先級
#ifdef PROBE_LOG
    uint32 count;
    char funcname[12];
#endif
};

class thread_dt保存了線程運行的全部關鍵現場信息,包括如下5部分架構

  1. 部分CPU通用寄存器(GPR, General Purpose Register)的當前狀態
  2. 程序計數器的當前指令執行地址,用與產生sub-routine結束後的返回地址
  3. 棧(Stack)指針
  4. 線程任務的操做內容對象指針
  5. 線程任務優先級

經過上述五部分所描述的線程上下文現場信息,處理器能夠切換到指定sub-routine執行新的任務。一般寄存器文件信息只需保存兩類寄存器做爲關鍵現場信息:由Saving RegistersTemporry Registersapp

microbalze寄存器功能描述

按上圖給出的定義因此對線程執行現場的保存須要保存r15,r17,r19-r31等15個必要GPR寄存器

  • threadext_dt類

class threadext_dt是線程任務函數的容器,其實現代碼以下:

class threadext_dt
{
public:
    static const uint32 thread_stack_size = 520284; //Near 512k. Not exactly 512k to avoid cache collision.  棧空間大小
#ifndef WIN32
    uint8 astack[thread_stack_size];  //棧空間
#endif
    thread_fn thread_start;           //線程任務函數指針
    void *thread_arg;                 //線程任務函數參數
};

class threadext_dt 保存的內容主要有三點:

  1. 線程的棧空間內容
  2. 任務線程函數指針,該指針在具體的線程任務建立時須要和實際的執行函數地址綁定,所支持的線程函數格式要求以下:
    void (*thread_fn)(void *arg)
  3. 任務線程函數的參數列表指針

線程空間所指定的棧空間大小比cache的物理尺寸(512KB)少一個內存頁框(page frame, 4KB)的大小。這種設計目的在於當cache基於4KB大小的cacheline作刷新時,cache經過DataBus總線訪問主存的最大單次數據傳輸寬度爲256bit*16等於4KB。當cache miss時,cache可直接使用這個多餘的空cacheline從主存調入新塊,並更新CAM表映射,上述狀況可在Data Cache Miss的數據量小於一個物理頁範圍時減小一次Cache Line的Write Back操做,從而減小Cache Miss帶來的訪存延遲懲罰(penalty)。因爲上述線程的保存並無利用線程任務的PID,所以在線程任務在切換時,前面線程存放於cache的數據對當前線程均miss,所以在存儲切換時可能有較大因爲cache warm up所帶來的訪存顛簸。

  • contextqueue類

class contexqueue實現了對線程池的管理功能, 實現以下:

class contextqueue
{
private:
    thread_dt* volatile head;
    thread_dt* volatile * volatile tail;

public:
    inline void init()
    {
        head = NULL;
        tail = &head;
    }

    inline thread_dt* volatile current()
    {
        return head;
    }

    inline void insert(thread_dt* c)
    {
        c->next = NULL;
        *tail = c;
        tail = &c->next;
    }

    inline void inserthead(thread_dt* c)
    {
        c->next = head;
        if(head == NULL)
        {
            tail = &c->next;
        }
        head = c;
    }

    inline void batchinsert(contextqueue &addqueue)
    {
        ASSERT(addqueue.head != NULL,LEVEL_NORMAL);
        *tail = addqueue.head;
        tail = addqueue.tail;
    }

    inline void remove()
    {
        if ((head = head->next) == NULL)
        {
            tail = &head;
        }
    }

    inline void removeall()
    {
        init();
    }

    inline void rotate()
    {
        ASSERT(head != NULL,LEVEL_NORMAL);
        *tail = head;
        tail = &head->next;
        head = head->next;
        *tail = NULL;
    }
};

上述線程池爲單向鏈表結構,並提供了8種資源管理方法

函數名 功能簡介
init() 線程池的初始化方法
current() 返回線程池鏈表的頭部元素指針
insert() 在線程池鏈表尾部插入新的線程元素
inserthead() 在線程池鏈表的頭部插入新的線程元素
batchinsert() 在線程池鏈表的尾部插入新的線程池鏈表
remove() 刪除線程池鏈表的頭部元素
removeall() 刪除整個線程池鏈表(從新初始化線程池)
retate() 線程池的初始化方法
init() 線程池的初始化方法

線程池init方法

init

線程池的初始化方法經過首尾指針構造了一個以class thread_dt對象爲元素的空白單向鏈表結構,頭部指針指向一個NULL對象,尾部的二級指針指向頭部指針地址的位置。

線程池insert方法

insert

線程池對象的插入方法將原線程池尾部元素的後驅指針鏈接到新元素,移動尾部指針的指向新元素的後驅指針位置,尾部元素的後驅指針須要指向NULL

線程池inserthead方法

inserthead

線程池的頭部插入方法將新元素的後驅指針指向原鏈表的頭部,特別在空表插入狀況下,須要將尾部指針定位到新元素的後驅指針地址位置,最後更新頭部指針指向新元素便可

線程池batchinsert方法

batchinsert

對線程池尾部插入一個線程鏈表需將尾指針指向新鏈表的頭部元素,同時移動尾指針執行新鏈表的尾部元素的後驅指針

線程池remove方法

remove

線程池鏈表元素的刪除老是刪除頭部元素,當刪除後爲非空鏈表,只需將頭部指針移向原頭部的後續元素,當出現刪空時,則還須要將尾部指針也指向原頭部後驅指針的地址位置

線程池removeall方法

等同於從新初始化線程池

線程池rotate方法

rotate

線程池的元素旋轉方法是調度器的調度策略的重要操做,目的使原鏈表的頭部元素被替換到尾部,從而實現round-robin的模式,操做時首先將尾部元素的後續指針指向頭部元素,更新尾指針指向原頭部元素的後驅指針地址,而後將頭部指針指向原頭部元素的後驅元素造成新的頭部元素(應注意,旋轉方法操做時應確保鏈表中很多於兩個線程元素,本調度器在流程機制上保證了這項條件), 最後將原頭部元素的後驅指針指向NULL完成旋轉操做

  • thread_lib類

class thread_lib 是調度器的核心代碼,主要實現以下:

class thread_lib
{
public:
    static const uint32 high_priority = 0;
    static const uint32 normal_priority = 1;
    static const uint32 low_priority = 2;
    static const uint32 childthreadintetris=lunsizeintetris*threadcountinlun;
#ifdef PROBE_LOG
    uint32 lasttick;
#endif
private:
    static const uint32 maxthread = tetrissizelimitinsystem*(1u+childthreadintetris);
    static thread_lib instance;
    static threadext_dt extcontext[maxthread];
    thread_dt availablecontext[maxthread];
    contextqueue ready_queue[3];//0:high priority   1:normal priority  2:low priority
    thread_dt main_thread;
    contextqueue spare_queue;
    thread_dt *current;
public:
    static contextqueue& get_readyqueue(uint32 priority)

    static contextqueue& get_currentreadyqueue()

    static void reschedule();

    static inline void init()

    static inline thread_dt* getcurrentcontext()

    static inline void __yield()

    static inline void yield()

    static inline void __lowpriorityyield()

    static inline void lowpriorityyield()

    static inline void sleep(uint32 Millisecond = 250)

    static inline void threadexit()

    static inline void reset_threadpool()

    //create_thread should only be called in thread context. Not in interrupt/dpc context.
#ifdef PROBE_LOG
    static void create_thread(const char* funcname,thread_fn thread_start,void* parg, uint32 priority);
#else
    static void create_thread(thread_fn thread_start,void* parg, uint32 priority);
#endif

#ifdef WIN32
    static VOID CALLBACK run_thread(LPVOID pcontext);
#else
    static void run_thread();
#endif

#ifdef PROBE_LOG
    static inline void thread_printf(void);
#endif

};

class thread_lib提供了調度器工做所必要的成員變量和調度方法

調度器成員變量的簡要說明

變量名 功能說明
lasttick 調度器上一次讀取的timer計數,表明某時刻系統累積的時間計數,相似Linux的jiffies概念
maxthread 定義線程池能支持的最大線程數,這個限制一般取決於平臺系統所定義的PID(Process Identify)字段的寬度,在本例取決於處理器算力及下游處理能力的帶寬極限
instance 所構造的thread lib靜態單例,方便將線程池放置於系統規劃的快速存儲段以加速調度器的調度效能
extcontext 在單例中構造的class threadext_dt靜態實例數組
availablecontext 在單例中構造的class thread_dt靜態實例數組
ready_queue 調度器中已註冊實際操做任務的線程池,分爲high_priority, normal_priority, low_priority三個優先級的獨立線程池
main_thread 調度器的主線程,即main函數產生的線程
spare_queue 在單例中構造的線程池,用於存放調度器未註冊的全部可用空白線程元素
current 調度器當前在執行的線程元素

調度器調度方法的簡要說明


操做函數 功能說明
get_readyqueue 得到指定優先級的線程池實例
get_currentreadyqueue 得到當前運行任務所在優先級的線程池實例
reschedule 調度器重調度方法,按照指定的調度策略將當前執行任務佔用的處理器計算資源釋放,並從ready_queue中選出下一個線程提交處處理器執行
init 調度器的初始化方法
getcurrentcontext 得到當前正在執行的線程元素實例
__yield 普通中斷模式下的處理器計算資源替換方法,讓當前執行的線程任務讓出處理器資源,並交給新的線程任務
yield 快速中斷模式下的處理器資源替換方法,做用與普通模式下相似
__lowpriorityyield 普通中斷模式下將當前任務讓步到low priority線程池隊列的方法
lowpriorityyield 快速中斷模式下將當前任務讓步到low priority線程池的方法
sleep 使當前線程休眠指定時間的間隔,單位ms
threadexit 子線程任務退出執行並返回主線程的方法
reset_threadpool 調度器內部資源的初始化方法
create_thread 註冊線程任務到空白線程元素
run_thread 執行線程任務的統一入口
thread_printf 調度器debug使用的打印函數
linkinterrupt 調度器線程任務與中斷連接方法

調度器的初始化過程

調度器經過init和reset_threadpool兩個函數完成內部資源節點的初始化操做,其中reset_threadpool是init函數調用的子程

reset_threadpool函數的源碼以下:

static inline void reset_threadpool()
    {
#ifdef WIN32
        for (uint32 i=0; i!=maxthread; ++i)
        {
            LPVOID thread_fiber = instance.availablecontext[i].thread_fiber;
            if (thread_fiber != NULL)
            {
                DeleteFiber(thread_fiber);
            }
        }
#endif
        //memset(&instance,0,sizeof instance);
        memset(&instance,0,sizeof(instance));
        memset(extcontext,0,sizeof(extcontext));
        for(uint32 i = high_priority; i <= low_priority; ++i)
        {
            instance.ready_queue[i].init();
        }
        instance.spare_queue.init();
        for(uint32 i=0; i!=maxthread; ++i)
        {
            instance.availablecontext[i].extdat=&extcontext[i];
            instance.spare_queue.insert(&instance.availablecontext[i]);
        }
    }

上述代碼操做流程以下

restpool

調度器初始化操做將類中使用靜態資源的內存空間段作歸0操做,並將線程實例(class thread_dt)與線程操做函數實例(class threadext_dt)創建一一綁定關係從而產生可用的空白線程元素,空白線程會加入到spare_queue供調度器隨時拾取。調度器的內部線程資源採用靜態資源的目的在於提升調度器的工做性能,詳細緣由將在下文闡述。

init函數的源碼以下:

static inline void init()
    {
        instance.reset_threadpool();

#ifdef WIN32
        instance.main_thread.thread_fiber = ConvertThreadToFiber(NULL);
        ASSERT(instance.main_thread.thread_fiber != NULL,LEVEL_NORMAL);
#endif

        instance.ready_queue[low_priority].insert(&(instance.main_thread));
        instance.current = &(instance.main_thread);
#ifdef PROBE_LOG
        instance.lasttick = reg_ops::gettickcount();
        instance.current->count = 0;
#endif
    instance.current->priority = low_priority;

    }

init函數的流程以下,在調用reset_threadpool初始化資源後,設置主線程和當前線程的狀態

init

調度器資源的獲取

get_readyqueue, getcurrentcontext,, get_currentreadyqueue是調度器三個資源狀態獲取方法,分別用於獲取指定條件下的線程資源

static contextqueue& get_readyqueue(uint32 priority)
{
    return instance.ready_queue[priority];
}
static contextqueue& get_currentreadyqueue()
{
    return instance.ready_queue[instance.current->priority];
}
static inline thread_dt* getcurrentcontext()
{
    return instance.current;
}
static inline void __yield()

get_readyqueue用於得到指定優先級的線程池隊列

get_currentreadyqueue用於得到當前正在執行線程所在優先級的線程池隊列

getcurrentcontext用於得到當前正在執行線程的線程對象

調度器任務的註冊方法

調度器經過create_thread函數將用戶函數綁定到空白線程對象,並註冊到ready_queue線程池中等待調度器提交處處理器執行

void thread_lib::create_thread(thread_fn thread_start,void* parg, uint32 priority)
#endif
{
    ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
    ASSERT(priority <= low_priority, LEVEL_NORMAL);
    thread_dt *pcontext = instance.spare_queue.current();
    ASSERT(pcontext!=NULL,LEVEL_NORMAL);
    instance.spare_queue.remove();
    pcontext->next = NULL;
    pcontext->priority = priority;
    pcontext->extdat->thread_start = thread_start;
    pcontext->extdat->thread_arg = parg;
#ifndef WIN32
    memset(pcontext->extdat->astack,0,threadext_dt::thread_stack_size);
#endif // WIN32
#ifdef PROBE_LOG
    pcontext->count = 0;
    memset(pcontext->funcname,' ',sizeof(pcontext->funcname));
    const char* funcnamebody=funcname;
    const char* current=funcname;
    while(*current!='\0')
    {
        if(*current==':')
        {
            funcnamebody=current+1;
        }
        ++current;
    }
    for(uint32 i=0;i!=sizeof(pcontext->funcname);++i)
    {
        char deschar=funcnamebody[i];
        if(deschar=='\0')
            break;
        pcontext->funcname[i]=deschar;
    }
#endif
#ifdef WIN32
    pcontext->thread_fiber  = CreateFiber(threadext_dt::thread_stack_size, &run_thread, NULL);
    ASSERT(pcontext->thread_fiber != NULL,LEVEL_NORMAL);
#else
    pcontext->sp = ((uint32)pcontext->extdat->astack)+threadext_dt::thread_stack_size-60;
    pcontext->r15 = (uint32)run_thread-8;
#endif
    uint32 status = clearinterruptacquire();
    instance.ready_queue[priority].insert(pcontext);
    interruptrestore(status);
}

流程以下

create_thread

線程棧空間地址以高位地址做爲棧底,逐漸向低位地址擴展棧空間範圍,在以上代碼中,實際的棧空間地址比定義最大棧空間地址小60個byte,目的使不一樣線程各自的棧空間之間留出足夠的安全距離,防止某個線程因爲stack overflow crash對其餘線程棧區產生破壞性覆蓋操做。

按照以上定義,處理通用寄存器文件GPR的第16個寄存器r15存放子過程的返回地址,因爲調度器自己屬於低級非搶佔式調度器,每一個線程任務在得到cpu計算資源後將不會被中斷打斷,一直執行至函數完畢,所以每一個子過程的返回地址都被註冊到class thread_lib的靜態成員函數run_thread, 具體的原理將在如下調度器過程說明中詳細闡述。實際子過程返回地址根據pcontext->r15 = (uint32)run_thread-8所示,位於run_thread函數label以前一個Dword地址, 這是因爲Microblaze處理器的branch模塊存在分支預測槽結構(delay slot

根據Xinlinx MicroBlaze Processor Reference Guide UG984 (v2018.3) Nov 14, 2018, 55頁及58頁關於分支延遲槽的描述以下

A control hazard occurs when a branch is taken, and the next instruction is not immediately available. This results in stalling the pipeline. MicroBlaze provides delay slot branches and the optional branch target cache to reduce the number of stall cycles.

Delay Slots

When executing a taken branch with delay slot, only the fetch pipeline stage in MicroBlaze is flushed. The instruction in the decode stage (branch delay slot) is allowed to complete. This technique effectively reduces the branch penalty from two clock cycles to one. Branch instructions with delay slots have a D appended to the instruction mnemonic. For example, the BNE instruction does not execute the subsequent instruction (does not have a delay slot), whereas BNED executes the next instruction before control is transferred to the branch location.

A delay slot must not contain the following instructions: IMM, IMML, branch, or break. Interrupts and external hardware breaks are deferred until after the delay slot branch has been completed. Instructions that could cause recoverable exceptions (for example unaligned word or halfword load and store) are allowed in the delay slot.
If an exception is caused in a delay slot the ESR[DS] bit is set, and the exception handler is responsible for returning the execution to the branch target (stored in the special purpose register BTR). If the ESR[DS] bit is set, register R17 is not valid (otherwise it contains the address following the instruction causing the exception).

存在分支預測槽結構的處理器,在執行branch類指令時,因爲取指單元(IFU, Instruction Fetch Unit)預取的指令緩存會清空,致使IFU須要從新預取新的有效指令,所以跳轉指令的效果會延遲數個cycle才能生效,從而在流水線上產生若干個cycle的空泡(bubble),爲了遮蓋這些流水線空泡,處理器會提早執行跳轉指令以後的數條指令,所以對應上文賦值給r15的返回地址將比實際的run_thread地址提早兩個指令左右的執行寬度,用於配合分支延遲槽的提早執行特徵

調度器的操做方法

調度器的操做方法主要有以下幾種

  • 線程重調度方法reschedule
  • 線程讓步方法yield, __yield, lowpriorityyield, __lowpriorityyield
  • 線程任務執行方法run_thread
  • 線程任務推出方法threadexit
  • 線程休眠方法sleep
線程重調度函數 reschedule

函數實現的代碼以下

void thread_lib::reschedule()
{
    thread_dt *pnewctx;
#ifdef PROBE_LOG
    uint32 curtick=reg_ops::gettickcount();
    instance.current->count += curtick - instance.lasttick;
    do
    {
        communicator::overheat_delay();
        if(laterthan(reg_ops::gettickcount(),currenttick))
        {
            uint32 newtick=reg_ops::gettickcount();
            accidletime+=newtick-curtick;
            curtick=newtick;
            disp_diagnoisisinfo();
        }
    }
    while((pnewctx = instance.ready_queue[high_priority].current())==NULL &&
        (pnewctx = instance.ready_queue[normal_priority].current())==NULL &&
        (pnewctx = instance.ready_queue[low_priority].current())==NULL);
    {
        uint32 newtick=reg_ops::gettickcount();
        accidletime+=newtick-curtick;
        instance.lasttick = newtick;
    }
#else
    do
    {
        communicator::overheat_delay();
    }
    while((pnewctx = instance.ready_queue[high_priority].current())==NULL &&
        (pnewctx = instance.ready_queue[normal_priority].current())==NULL &&
        (pnewctx = instance.ready_queue[low_priority].current())==NULL);
#endif
#ifdef WIN32
    instance.current = pnewctx;
    LPVOID next_fiber =instance.current->thread_fiber;
    ASSERT(next_fiber != NULL,LEVEL_NORMAL);
    SwitchToFiber(next_fiber);
#else
    thread_dt *poldctx = instance.current;
    instance.current = pnewctx;
    __Yield(poldctx,pnewctx);
#endif
}

重調度函數實現的功能很是簡單, 即按照從高到低的優先級從ready_queue線程池中取出下一個即將執行的線程元素,並使其取代當前執行線程得到處理器計算資源, 圖示流程以下。

reschedule

過溫判斷是一個阻塞式的函數調用過程,CPU所讀取到系統溫度傳感器(Temperature Sendor)的讀數高過預設的過溫閾值時,CPU須要反覆進入nop指令以等待系統溫度下降到安全閾值如下,等待函數overheat_delay的代碼以下

static inline void overheat_delay()
    {
        //Wait for a certain period of time.
        //1/2 CPU computation power.
        uint32 isrflag=clearinterruptacquire();
        uint32 tick=reg_ops::gettickcount();
        uint32 interval = timeinterval(instance.last_ohdelay_tick, tick);
        interval=Min(interval,reg_ops::tick_size*max_ohdelay_interval);
        uint32 tickguard=tick+interval;
        while(beforethan(reg_ops::gettickcount(),tickguard));
        instance.last_ohdelay_tick=reg_ops::gettickcount();
        interruptrestore(isrflag);
    }

在執行等待過程當中,須要將系統的中斷響應使能關閉,防止意外中斷的介入打斷系統降溫過程。在過程當中,系統須要讀入實時計數器(Timer/R, Real Timer Clock)的當前計數,並與預設的降溫等待間隔累加獲得tickguard值,當系統polling到的tick計數小於tickguard時,全系統除CPU外,業務均處於pending狀態已使系統待機降溫。tick 的概念相似Linux 系統中所提出的jiffies概念,即系統開機後,一段時間內累計的總時間週期基數,該計數用於系統執行一些延遲等待任務。

重調度函數的核心部分是用於執行線程替換的內嵌ASM函數__Yield

Yield函數使用內嵌式彙編調用接口,其接口形式以下

extern void __Yield(thread_dt* poldctx,thread_dt* pnewctx);

內嵌式彙編程序的參數傳遞方式通常有三種,經常使用的有經過彙編佔位符方式引入參數和經過處理器paramerter寄存器引入參數。

經過彙編佔位符引入參數的內聯彙編格式以下

__asm__ __volatile__("Instruction List" : Output : Input : Clobber/Modify);
  • Instruction List

Instruction List 是彙編指令序列。它能夠是空的,好比:__asm__ volatile__(""); 或 __asm ("");都是徹底合法的內聯彙編表達式

  • volatile

__volatile__是GCC 關鍵字volatile 的宏定義
#define __volatile__ volatile __volatile__或volatile 是可選的。若是用了它,則是向GCC 聲明不容許對該內聯彙編優化

  • Output

Output 用來指定當前內聯彙編語句的輸出

  • Input

Input 域的內容用來指定當前內聯彙編語句的輸入Output和Input中,格式爲形如「constraint」(variable)的列表(逗號分隔)

  • Clobber/Modify

有時候,你想通知GCC當前內聯彙編語句可能會對某些寄存器或內存進行修改,但願GCC在編譯時可以將這一點考慮進去。那麼你就能夠在Clobber/Modify域聲明這些寄存器或內存。這種狀況通常發生在一個寄存器出如今"Instruction List",但卻不是由Input/Output操做表達式所指定的,也不是在一些Input/Output操做表達式使用"r"約束時由GCC 爲其選擇的,同時此寄存器被"Instruction List"中的指令修改,而這個寄存器只是供當前內聯彙編臨時使用的狀況。

  • 通用約束

約束 Input/Output 意義 g I,O 表示可使用通用寄存器,內存,當即數等任何一種處理方式。 0,1,2,3,4,5,6,7,8,9 I 表示和第n個操做表達式使用相同的寄存器/內存

例如:

__asm__ ("popl %0 \n\t"
         "movl %1, %%esi \n\t"
         "movl %2, %%edi \n\t": 
         "=a"(__out): 
         "r" (__in1), 
         "r" (__in2));

此例中,%0對應的就是Output操做表達式,它被指定的寄存器是%eax,整個Instruction List的第一條指令popl %0,編譯後就成爲popl %eax,這時%eax的內容已經被修改,隨後在Instruction List後,GCC會經過movl %eax, address_of_out這條指令將%eax的內容放置到Output變量__out中。對於本例中的兩個Input操做表達式而言,它們的寄存器約 束爲"r",即要求GCC爲其指定合適的寄存器,而後在Instruction List以前將__in1和__in2的內容放入被選出的寄存器中,若是它們中的一個選擇了已經被__out指定的寄存器%eax,假如是__in1,那 麼GCC在Instruction List以前會插入指令movl address_of_in1, %eax,那麼隨後popl %eax指令就修改了%eax的值,此時%eax中存放的已經不是Input變量__in1的值了,那麼隨後的movl %1, %%esi指令,將不會按照咱們的本意——即將__in1的值放入%esi中——而是將__out的值放入%esi中了。

__Yield函數採用的第二種方式經過處理器parameter寄存器進行函數傳參, 根據Xinlinx MicroBlaze Processor Reference Guide UG984 (v2018.3) Nov 14, 2018 195頁描述, r5-r10是Mircoblaze處理器GPR中的參數寄存器,引入參數按照形參順序依次放入寄存器位置

registerparam

在本例中使用r5和r6兩枚參數寄存器分別存放poldctx與pnewctx指針, 調用swi save 指令按照class/struct內存排布結構,依次向內存段將當前GPR的保存內容到poldctx對象空間,同時使用lwi load 指令將新線程中上下文內容換入處處理器的GPR空間。對於class thread_dt的實例在內存中的排布結構以下所示:

classstruct

以上結構爲class thread_dt在實例在內存中的排布結構,因爲線程池元素構造了類的實例對象數組,所以在內存構造中並無this指針佔用結構地址。

__Yield函數的源碼實現以下

.section .text
    .globl    __Yield
    .align    4
    .ent    __Yield
        .type __Yield, @function
__Yield:
    //save registers
    swi    r15,r5,4
    swi    r17,r5,8
    swi    r19,r5,12
    swi    r20,r5,16
    swi    r21,r5,20
    swi    r22,r5,24
    swi    r23,r5,28
    swi    r24,r5,32
    swi    r25,r5,36
    swi    r26,r5,40
    swi    r27,r5,44
    swi    r28,r5,48
    swi    r29,r5,52
    swi    r30,r5,56
    swi    r31,r5,60
    //store R1 in *poldctx->sp
    swi    r1,r5,0
    //set R1 to *pnewctx->sp
    lwi    r1,r6,0
    //restore registers
    lwi    r15,r6,4
    lwi    r17,r6,8
    lwi    r19,r6,12
    lwi    r20,r6,16
    lwi    r21,r6,20
    lwi    r22,r6,24
    lwi    r23,r6,28
    lwi    r24,r6,32
    lwi    r25,r6,36
    lwi    r26,r6,40
    lwi    r27,r6,44
    lwi    r28,r6,48
    lwi    r29,r6,52
    lwi    r30,r6,56
    rtsd    r15,8
    lwi    r31,r6,60
    .end    __Yield

代碼中 ".section .text" 表示該段代碼位於程序的text段,即指令正文, ".globl __Yield" 表示函數的label名 __Yield全局空間可見,以方便連接器按名字執行連接操做,」.align 4「 表示生成的二進制代碼按照4字節對齊排布,對應microblaze做爲RISC 32處理的格式要求 」.ent __Yield「表示做爲__Yield函數的正文起始, 」.type __Yield, @function"用於指定代碼的類型爲函數,__Yield部分代碼屬於函數子程。".end __Yield"表示整個函數的結尾。

__Yield函數首先調用swi指令將以r15開始的15個上下文GPR內容保存到poldxtx內存段基址+4偏址至+60偏址的內存空間,最後把棧空間指針從r1複製到poldxtx內存段基址+0偏址從而完成現有進程的上下文保存。

新線程任務的注入的過程與保存過程相反,調用lwi指令首先將pnewxtx內存段基地址+0偏址的棧空間sp指針地址複製到r1寄存器 而後將r15開始的15個上下文GPR內容從pnewctx基址的+4至+60偏址段內容複製處處理器的GPR, 最後使用rtsd將PC指針重定向到r15返回地址+8的位置,因爲上文r15寄存器內容被設置爲,r15=run_thread=8,所以重定向的PC指針地址爲run_thread函數的統一入口地址。此例中能夠看到rtsd 分支指令提早到 lwi r31, r6,60指令前執行,其緣由在於上文所提到的分支預測槽的影響,因爲rtsd指令須要至少延遲一個指令週期才能生效,爲了屏蔽延遲帶來的空泡,故將rtsd指令提早一個週期執行,使延遲時間槽正好被下一條指令的執行時間所填滿,在調度器頻繁使用線程切換時,能夠提升必定的指令執行的效率。

線程讓步函數

調度器提供了yield, __yield, lowpriorityyield, __lowpriorityyield四種線程讓步函數,其源碼以下:

static inline void __yield()
    {
        instance.ready_queue[instance.current->priority].rotate();
        setinterrupt();
        reschedule();
        clearinterrupt();
    }
    static inline void yield()
    {
        uint32 status = clearinterruptacquire();
        instance.ready_queue[instance.current->priority].rotate();
        interruptrestore(status);
        reschedule();
    }
    static inline void __lowpriorityyield()
    {
        uint32 priority = instance.current->priority;
        instance.ready_queue[priority].remove();
        instance.ready_queue[low_priority].inserthead(instance.current);
        setinterrupt();
        reschedule();
        clearinterrupt();
        instance.ready_queue[low_priority].remove();
        instance.ready_queue[priority].inserthead(instance.current);
    }
    static inline void lowpriorityyield()
    {
        uint32 priority = instance.current->priority;
        uint32 status = clearinterruptacquire();
        instance.ready_queue[priority].remove();
        instance.ready_queue[low_priority].inserthead(instance.current);
        interruptrestore(status);
        reschedule();
        status = clearinterruptacquire();
        instance.ready_queue[low_priority].remove();
        instance.ready_queue[priority].inserthead(instance.current);
        interruptrestore(status);
    }

__yieldyield函數區別在於__yield函數對應普通中斷模式而yield對應快速中斷模式,兩類線程讓步函數的流程十分類似。

yield

__lowpriorityyieldpriorityyield函數區別與上面的狀況相似,分別在普通中斷和快速中斷模式下將當前任務讓步到低優先級線程池隊列。

lowyield

線程任務運行與退出方法

線程退出方法threadexit的實現源碼以下:

static inline void threadexit()
    {
#ifdef PROBE_LOG
        instance.current->extdat->thread_start=NULL;
        instance.current->extdat->thread_arg=NULL;
#endif
        ASSERT((instance.current != NULL),LEVEL_NORMAL);
        uint32 status = clearinterruptacquire();

        instance.ready_queue[instance.current->priority].remove();

        interruptrestore(status);
        instance.spare_queue.insert(instance.current);
        instance.reschedule();
    }

調度器的線程退出函數提供了用戶線程完成線程函數任務後退出原線程上下文的方法,其流程以下圖

exit

線程運行方法threadexit的實現源碼以下:

void thread_lib::run_thread()
{
    instance.current->extdat->thread_start(instance.current->extdat->thread_arg);
    threadexit();
}

run_thread函數是調度器類的靜態函數方法,是用戶定義線程執行最重要的統一程序入口。全部子線程得到處理器計算資源的起始執行位置都將從run_thread開始,其流程以下。

run

在執行run_thread方法以前,通常已經經過reschedule方法將新線程任務設置爲當前的current線程,所以在run_thread執行中,函數任務將始終執行current線程的函數任務方法。經過run_thread運行線程任務,使調度器在多線程上下文切換的過程當中可以始終有效管理子線程的運行行爲。

線程任務休眠方法

調度器的休眠方法在主線程中執行以毫秒爲單位的運行時間延遲,在延遲時間內,經過循環yield調用重調度函數,使其餘子線程得之後臺運行,線程最終返回主線程時,經過讀取timer獲得tick計數判斷是否達到預約的時間延遲,因爲延遲斷定須要切換回主線程才能執行,當線程池中的用戶線程過多時,延遲等待的時間並不精確,一般會超出預設的等待時間。

sleep

線程中斷綁定方法

調度器提供了中斷綁定線程任務的接口,本例描述的低層次搶佔式調度器中沒有使用綁定中斷方式來增長調度任務的場景應用環境

中斷綁定接口的代碼以下

static inline void linkinterrupt(uint32 ISRID,ISRCallBack f)
{
    aISRFunc[ISRID] = f;
}

microbalze的處理器採用low-latancy中斷查詢機制,觸發中斷後會進入統一中斷服務接口函數,查詢具體的中斷事件源並處理中斷,在根據Xinlinx MicroBlaze Processor Reference Guide UG984 (v2018.3) Nov 14, 2018第85頁到86頁對中斷機制有詳細描述

Interrupt

MicroBlaze supports one external interrupt source (connected to the Interrupt input port). The processor only reacts to interrupts if the Interrupt Enable (IE) bit in the Machine Status Register (MSR) is set to 1. On an interrupt, the instruction in the execution stage completes while the instruction in the decode stage is replaced by a branch to the interrupt vector. This is either address C_BASE_VECTORS + 0x10, or with low-latency interrupt mode, the address supplied by the Interrupt Controller.

The interrupt return address (the PC associated with the instruction in the decode stage at the time of the interrupt) is automatically loaded into general purpose register R14. In addition, the processor also disables future interrupts by clearing the IE bit in the MSR. The IE bit is automatically set again when executing the RTID instruction.

Interrupts are ignored by the processor if either of the break in progress (BIP) or exception in progress (EIP) bits in the MSR are set to 1.

By using the parameter C_INTERRUPT_IS_EDGE, the external interrupt can either be set to level-sensitive or edge-triggered:

• When using level-sensitive interrupts, the Interrupt input must remain set until
MicroBlaze has taken the interrupt, and jumped to the interrupt vector. Software must
cknowledge the interrupt at the source to clear it before returning from the interrupt
handler. If not, the interrupt is taken again, as soon as interrupts are enabled when
returning from the interrupt handler.

• When using edge-triggered interrupts, MicroBlaze detects and latches the Interrupt
input edge, which means that the input only needs to be asserted one clock cycle. The
interrupt input can remain asserted, but must be deasserted at least one clock cycle
before a new interrupt can be detected. The latching of an edge-triggered interrupt is
independent of the IE bit in MSR. Should an interrupt occur while the IE bit is 0, it will
immediately be serviced when the IE bit is set to 1.

With periodic interrupt sources, such as the FIT Timer IP core, that do not have a method to clear the interrupt from software, it is recommended to use edge-triggered interrupts.

Low-latency Interrupt Mode

A low-latency interrupt mode is available, which allows the Interrupt Controller to directly supply the interrupt vector for each individual interrupt (using the Interrupt_Address input port). The address of each fast interrupt handler must be passed to the Interrupt Controller when initializing the interrupt system. When a particular interrupt occurs, this address is supplied by the Interrupt Controller, which allows MicroBlaze to directly jump to the handler code.

With this mode, MicroBlaze also directly sends the appropriate interrupt acknowledge to the Interrupt Controller (using the Interrupt_Ack output port), although it is still the responsibility of the Interrupt Service Routine to acknowledge level sensitive interrupts at the source.

This information allows the Interrupt Controller to acknowledge interrupts appropriately, both for level-sensitive and edge-triggered interrupt.

To inform the Interrupt Controller of the interrupt handling events, Interrupt_Ack is set to:

• 01: When MicroBlaze jumps to the interrupt handler code,

• 10: When the RTID instruction is executed to return from interrupt,

• 11: When MSR[IE] is changed from 0 to 1, which enables interrupts again.

The Interrupt_Ack output port is active during one clock cycle, and is then reset to 00

程序的入口地址+0x10的偏移地址部分即統一的中斷入口函數地址

.globl _start
        .section .vectors.reset, "ax"
    .align 2
        .ent _start
        .type _start, @function
_start:
        brai    _start1
        .end _start

    .globl _interrupthandle
        .section .vectors.interrupt, "ax"
    .align 2
        .ent _interrupthandle
        .type _interrupthandle, @function
_interrupthandle:
        brai    PreemptiveInterrupt
        .end _interrupthandle

當觸發中斷後,處理器會自動跳轉到_interrupthandle執行中斷服務任務,中斷服務會跳轉至PreemptiveInterrupt執行中斷查詢任務,執行相關的中斷處理任務,PreemptiveInterrupt中斷查詢服務函數代碼以下,因與調度器工做流程無關,此處不展開分析

void PreemptiveInterrupt(void)
{
    uint32 CurrentDPCLevel = dpc_lib::instance.level;
    dpc_lib::instance.level = dpc_lib::interruptlevel;
    uint32 IntrStatus  =  reg_ops::get_interrupt_status();
    while (IntrStatus)
    {
        uint32 IntrMask = ((IntrStatus^(IntrStatus-1))+1)>>1;
        uint32 IntrIndex;
        IntrIndex = bitscanreverse(IntrMask);
        aISRFunc[IntrIndex]();
        reg_ops::ack_interrupt(IntrMask);
        IntrStatus  = reg_ops::get_interrupt_status();
    }

    dpc_lib::instance.level = CurrentDPCLevel;
    while (true)
    {
        uint32 scheduleMask = dpc_lib::instance.enablemask & dpc_lib::instance.triggermask;
        uint32 NewDPCLevel = bitscan(scheduleMask);
        if (NewDPCLevel+1 >= CurrentDPCLevel)
            break;

        //Save Current DPCLevel and Registers
        dpc_lib::instance.level = NewDPCLevel+1;
        dpc_lib::instance.triggermask ^= (0x80000000u >> NewDPCLevel);
        __asm__ __volatile__ ("addik r1, r1, -4":::"memory");
        __asm__ __volatile__ ("swi r14, r1, 0":::"memory");

        //Enable Interrupt
        setinterrupt();

        dpc_lib::instance.dpcfun[NewDPCLevel]();

        //Disable Interrupt
        clearinterrupt();

        //Restore Current DPCLevel and Registers
        __asm__ __volatile__ ("lwi r14, r1, 0":::"memory");
        __asm__ __volatile__ ("addik r1, r1, 4":::"memory");
        dpc_lib::instance.level = CurrentDPCLevel;
    }
}
  • 調度器的工做流程

在理清調度器主要調度方法後,咱們能夠在下文對調度器展開模擬工做過程分析

調度器資源存活的內存物理區間

爲了在調度器工做過程當中實現更高的響應性能,則須要優化調度器工做中資源開銷的IO訪存性能, 本例中調度器採用靜態實例資源實現,經過連接手段將調度器資源放置在處理器的緊耦合內存(TCM, Tight Couppling Memory)上, MicroBlaze所擁的TCM內存稱爲BRAM供指令和數據同時使用。處理器內核訪問BRAM的延遲一般在5個時鐘週期之內,遠小於訪問主存DDR帶來的數百個時鐘週期延遲。
bram

在實際應用中咱們在連接腳本ld.script中自定義了一種快速數據段FASTDATA_SECTION(fastdata),從而將調度器資源指定到BRAM空間內

調度器資源創建位置代碼

#define FASTDATA_SECTION    __attribute__ ((section ("fastdata")))

thread_lib thread_lib::instance FASTDATA_SECTION;

ld.script

_STACK_SIZE = DEFINED(_STACK_SIZE) ? _STACK_SIZE : 0x100000;
_HEAP_SIZE = DEFINED(_HEAP_SIZE) ? _HEAP_SIZE : 0;

/* Define Memories in the system */

MEMORY
{
   microblaze_0_i_bram_ctrl_microblaze_0_d_bram_ctrl : ORIGIN = 0x00000050, LENGTH = 0x0001FFB0
   axi_7series_ddrx_0_S_AXI_BASEADDR : ORIGIN = 0xC0000000, LENGTH = 0x3BF60000
}

/* Specify the default entry point to the program */

ENTRY(_start)

/* Define the sections, and where they are mapped in memory */

SECTIONS
{
.vectors.reset 0x00000000 : {
   *(.vectors.reset)
} 
.vectors.interrupt 0x00000010 : {
   *(.vectors.interrupt)
} 
.vectors.exception 0x00000020 : {
   *(.vectors.exception)
} 

.text : {
   *(.text)
   *(.text.*)
   *(.gnu.linkonce.t.*)
} > microblaze_0_i_bram_ctrl_microblaze_0_d_bram_ctrl

.fastdata : {
   . = ALIGN(4);
   *(.fastdata)   
} > microblaze_0_i_bram_ctrl_microblaze_0_d_bram_ctrl
}

調度器的模擬工做流程

如下經過流程圖來展現整個調度器生命週期的工做過程

  1. 調度器完成初始化過程後處理器GPR被主線程的上下文環境所佔據,此時ready_queue中尚無有效線程單元,current指針指向main_thread實例空間(內容爲空)

init

  1. 用戶經過create_thread方法不斷從spare_queue中抽取空白線程單元填入各種線程任務方法,此時ready_queue中high/normal/low三種優先級隊列中被註冊了若干線程任務函數,current指針仍指向main_thread實例空間(內容爲空)

create

  1. 程序執行yield方法從ready_queue中按照從高到底優先級抽取線程對象,將新線程與current指針綁定,啓動線程讓步後,主線程上下文被保存至main_thread實例空間並插入到low-priority隊列的尾部。所以主線程只當優先級在其以前的全部線程池任務執行完畢才能從新得到CPU的計算資源。

yield

  1. yield方法會將除主線程之外全部線程執行的啓動位置定位到"run_thread", 當執行完current所指向的線程所承載的任務函數後,將已執行完的線程從ready_queue中刪除,放回到spare_queue隊列,並再次啓動reschedule過程從線程池挑選新的線程任務提交給處理器執行。

exit

  1. 當線程池中全部用戶程序任務被執行完畢並放回到spare_queue, 主線程將再次得到處理器計算資源,主線程將按照執行yield時的上下文繼續向下執行yield以後的程序。

return

  • 與調度器相關的事件與信號量方法

原始設計中一樣提供了與調度器運行相關的事件與信號量方法

線程事件方法

class event_dt
{
private:
    volatile uint32 status;
    thread_dt* volatile thread;
public:
    inline void init(uint32 initialvalue)
    {
        status = initialvalue;
        thread=NULL;
    }
    inline bool isset() const
    {
        return status!=0;
    }
    inline void __set()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        status = 1;
        if(thread != NULL)
        {
            thread_lib::get_readyqueue(thread->priority).insert(thread);
            thread=NULL;
        }
    }
    inline void set()
    {
        uint32 oldstatus = clearinterruptacquire();
        ASSERT(oldstatus,LEVEL_INFO);
        __set();
        interruptrestore(oldstatus);
    }
    inline void reset()
    {
        status = 0;
    }
    inline void __wait()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        if(status == 0)
        {
            thread_lib::get_currentreadyqueue().remove();
            ASSERT(thread==NULL,LEVEL_NORMAL);
            thread=thread_lib::getcurrentcontext();
            setinterrupt();
            thread_lib::reschedule();
            clearinterrupt();
        }
    }
    inline void wait()
    {
        ASSERT(isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        uint32 oldstatus = clearinterruptacquire();
        if(status == 0)
        {
            thread_lib::get_currentreadyqueue().remove();
            ASSERT(thread==NULL,LEVEL_NORMAL);
            thread=thread_lib::getcurrentcontext();
            interruptrestore(oldstatus);
            thread_lib::reschedule();
        }
        else
        {
            interruptrestore(oldstatus);
        }
    }
};

本例實現的事件用於用戶線程之間的觸發等待通訊,其成員變量和方法的含義以下


成員變量 功能說明
status 事件阻塞式等待標誌,0爲阻塞等待,非0則爲非阻塞訪問
thread 用於承載阻塞等待線程的線程容器,待事件觸發後返回

操做函數 功能說明
init 事件初始化方法,用於設置事件實例的等待方式(默認爲0)和承載線程(設爲NULL)
isset 查看事件有無被觸發(status設置爲0)
__set 事件觸發函數,設置status狀態,並將事件等待所在線程從新註冊到線程池隊列末尾
set 事件觸發線程,用於開關中斷並調用__set
reset 事件重置方法,將事件實例狀態歸爲初始化狀態
__wait 普通中斷模式下的事件等待方法,爲整個事件通訊流程的發起函數
wait 快速中斷模式下的事件等待方法,爲整個事件通訊流程的發起函數
  1. 以下圖所示,事件類在被申請和初始化後,事件類內部的事件觸發標誌status被初始化爲0,wait操做的寄生線程爲NULL

eventinit

  1. 執行事件wait函數等待事件觸發後,wait操做的寄生線程被指向等待操做所在的線程(紅框標識)

eventwait

  1. wait函數將啓動調度器的reschedule重調度操做,將wait所在線程替換出CPU上下文環境,並放回到spare_queue,同時在event的容器內保留該寄生線程的備份, 用戶可按照業務須要在其餘用戶線程中放置事件促發操做(紫逛標識)

eventyield

  1. 當調度器將事件促發線程調換到CPU執行時,事件觸發操做執行,設置事件類中的促發標誌(status標黃),同時將事件類容器中的等待操做寄生線程從新插入回ready_queue的線程池,上述操做執行完畢,觸發線程將被刪除b並被放回spare_queue。

eventset

  1. 調度器從新將事件等待寄生線程調換回CPU,處理器上下文回到等待函數的在yield後的代碼並繼續向下執行,事件等待操做完成。

eventwaitdone

以上事件操做涉及多個線程,所以事件類的聲明須要放置在全局變量空間(.data或者.bss段,或其餘自定義的全局空間段),因爲調度器屬於搶佔式,所以對於觸發標誌的檢查不須要在while循環中執行,這是因爲reschedule的操做會將已執行的線程從線程池刪除而且觸發線程必定會在等待線程以前完成,採用循環檢查會致使第二輪次的yield出現線程空指針錯誤。

線程信號量方法

class semaphore_dt
{
private:
    volatile int32 status;
    thread_dt* volatile thread;

public:
    inline semaphore_dt()
    {
    }
    inline semaphore_dt(int32 initialvalue)
    {
        init(initialvalue);
    }
    inline void init(int32 initialvalue)
    {
        status = initialvalue;
        thread=NULL;
    }
    inline void __inc()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        if(status == 0)
        {
            if(thread == NULL)
            {
                ++status;
            }
            else
            {
                thread_lib::get_readyqueue(thread->priority).insert(thread);
                thread=NULL;
            }
        }
        else
        {
            ++status;
        }
    }
    inline void inc()
    {
        uint32 oldstatus = clearinterruptacquire();
        ASSERT(oldstatus,LEVEL_INFO);
        __inc();
        interruptrestore(oldstatus);
    }
    inline int32 getresourcecount() const
    {
        return status;
    }
    inline bool isneedwait() const
    {
        return status <=0;
    }
    inline bool __trywait()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        if(status <= 0)
        {
            return false;
        }
        --status;
        return true;
    }
    inline bool trywait()
    {
        uint32 oldstatus = clearinterruptacquire();
        ASSERT(oldstatus,LEVEL_INFO);
        bool ret=__trywait();
        interruptrestore(oldstatus);
        return ret;
    }
    inline void __wait()
    {
        ASSERT(!isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        if(status <= 0)
        {
            thread_lib::get_currentreadyqueue().remove();
            ASSERT(thread==NULL,LEVEL_NORMAL);
            thread=thread_lib::getcurrentcontext();

            setinterrupt();
            thread_lib::reschedule();
            clearinterrupt();
        }
        else
        {
            --status;
        }
    }
    inline void wait()
    {
        ASSERT(isinterruptenabled(),LEVEL_NORMAL);
        ASSERT(dpc_lib::getdpclevel() == dpc_lib::threadlevel, LEVEL_NORMAL);
        uint32 oldstatus = clearinterruptacquire();
        if(status <= 0)
        {
            thread_lib::get_currentreadyqueue().remove();
            ASSERT(thread==NULL,LEVEL_NORMAL);
            thread=thread_lib::getcurrentcontext();
            interruptrestore(oldstatus);
            thread_lib::reschedule();
        }
        else
        {
            --status;
            interruptrestore(oldstatus);
        }
    }
};

本例的信號量用於用戶線程之間的旗幟信令握手,其成員變量和方法的含義以下


成員變量 功能說明
status 用於初始化旗語容許的最大信號量,可用於控制用戶線程事件執行的數量
thread 用於承載阻塞等待線程的線程容器,待事件觸發後返回

操做函數 功能說明
init 初始化信號量的最大受權數量,可用於控制使用該信號量同步的線程執行個數
__inc 旗語信號量的放回操做方法,用於恢復信號量的受權數
inc 執行旗語信號量的放匯操做,開關中斷響應,調用__inc
getresourcecount 得到當前可用的信號量個數
isneedwait 判斷當前信號量是否已經所有受權出(新等待受權任務須要等待)
__trywait 普通中斷模式下嘗試等待信號量容器受權信令,若沒有效受權,申請受權線程不造成阻塞
trywait 快速中斷模式下嘗試等待信號量容器受權信令,若沒有效受權,申請受權線程不造成阻塞
__wait 普通中斷模式下嘗試等待信號量容器受權信令,若沒有效受權,申請受權線程將阻塞
wait 快速中斷模式下嘗試等待信號量容器受權信令,若沒有效受權,申請受權線程將阻塞
  1. 信號量的初始化完成後,內部信號量容器會設置若干受權信令(status初始值),同時將寄生線程指針指向NULL

semainit

  1. 信號量寄生線程(紅框標識)經過屢次等待函數的執行從內部信號量容器取走信令(每執行一次wait函數,status減1,直至遞減至0),同時將寄生線程指針指向當前線程。在線程池中注入多個信令交還線程(紫框標識),當受權取空時,再次在寄生線程中啓動等待函數會迫使當前線程啓動reschedule函數重調度線程

semawait

  1. 當信令注入線程被調度器調度處處理器執行時,inc函數會交還一個受權信令(status加1),同時將寄生線程指針從新放回ready_queue線程池

semainc

  1. 當寄生線程從新調度會理器執行時,原來因爲缺少受權信令而進入重調度過程的函數得到新的信令得以繼續執行,信令受權申請結束。

semareturn

與事件類操做涉相似,信號量類的聲明也須要放置在全局變量空間,對於信號受權檢查的方式因爲與事件類相同的緣由,也不採用循環檢查的方式。

  • 後記

以上討論了一個很是簡單的RTOS調度器的實現,真實的分時系統所採用的調度器因爲平均時間片和中斷的引入,在結構和流程設計時會更加複雜。將來將嘗試在上述調度器上逐步升級加入時間片切換和中斷方式。
衡量一個調度器設計的優劣通常能夠考察其工做性能。

調度器的性能指標

調度器的最終目標是運行用戶程序,讓處理器被合理的利用。那麼,評價一個調度器算法的指標是什麼?
通常定量的指標,一般咱們第一個想到的就是CPU利用率。CPU利用率在必定程度上能夠說明問題,表示CPU的繁忙程度,但不夠細緻,由於咱們不清楚CPU到底在忙什麼。
從以系統爲中心和以用戶爲中心,大約有如下幾個能夠利用的指標:

  • 以系統爲中心:
  1. CPU利用率:CPU處理器運行指令的繁忙時間的佔比
  2. 吞吐量:表示單位時間內所完成的做業個數
  3. 平均週轉時間:測量任務進入和離開系統平均所花的時間(t1+t2+...+tn)/n
  4. 平均等待時間:表示系統任務的平均等待時間(w1+w2_...wn)/n
  • 以用戶爲中心:
  1. 響應時間:表示特定的任務i的週轉時間ti
  2. 響應時間方差:表示給定進程的實際響應時間與其指望值的統計差別

除了上面介紹的定量指標,值得一提的調度器算法的定性指標:

  1. 飢餓:在任何進程做業的組合中,調度策略都應該確保全部的任務一直都有進展,若是因爲某種緣由,一個進程任務並無任何進展,咱們把這種狀況稱之爲飢餓。這種狀況的定量表現是,某個特定任務的響應時間沒有上限。
  2. 護送效應(convey effect):在任何進程做業的組合中,調度策略應該預防長時間運行的某個任務徹底佔據CPU的使用。若是出於某種緣由,任務的調度符合固定的規律(相似於軍隊的護衛),這種狀況稱之爲護送效應。這種現象的定量表現爲,任務的響應時間的方差很大。

調度算法

這裏介紹幾種典型的非搶佔式和搶佔式的算法。

  1. 非搶佔式的調度算法
  • 先到先服務算法(FCFS, First-Come First-Served)

這個算法會用到的屬性是進程的到達時間,也就是啓動運行一個進程的時間。先啓動的進程會優先被調度器選中,以下圖所示,P1是第一個到達的,而後再是P2, P3,因此根據先到先服務原則,調度器老是會優先選擇P1,而後P2,P3。

優勢:這個算法有一個很好的性質,就是任何進程都不會飢餓,也就是說算法沒有回致使任務進程拒絕服務的內在偏向

缺點:但因爲上面這個性質,響應時間的方差會很大。舉個例子,一個長時間任務到達後,後面跟着一個短期的任務,那麼短任務被長做業擋在後面,它的響應時間就會很糟糕,因爲護送效應致使低下的CPU利用率。因此這個算法並無對短任何給予任何優先考慮。

  • 最短做業優先(SJF, Shortest Job First)

既然先到先服務對短任務不是很友好,那麼這個算法就是爲了讓短做業得到更好的響應時間。
優勢:調度器會優先選擇時間較短的任務,讓短任務得到更好的響應時間;
缺點:有可能會讓一個長時任務飢餓。
解決這個缺點有一個方案,當一個做業的年齡到達一個閾值,調度器忽略SJF, 選擇FCFS算法。

  • 優先級算法

出於調度的目的,多數OS會給每一個進程賦予一個屬性——優先級。好比,在UNIX系統中,每一個用戶級進程開始時都有一個固定的默認優先級。Ready Queue中包含多個子隊列,每一個隊列都對應着一個優先級,每一個子隊列內部採用FCFS算法,以下圖所示:

優勢:靈活,能夠提供差別化服務

缺點:會產生飢餓,能夠根據進程的等待時間來提升優先級

  1. 搶佔式調度算法

搶佔式與非搶佔式的區別在於:在一個新進程或剛完成I/O的進程進入到ready queue中時,會從新評估一些屬性(好比剩餘執行時間),以決定要不要搶佔當前正在運行的進程。原則上說,上面討論到的任何一個非搶佔式算法都能改形成搶佔式的,好比FCFS算法,每次從新進入就緒隊列時,調度器能夠決定搶佔當前正在執行的進程(若是新任務的到達時間比較早),相似的,SJF和優先級也同樣。
下面介紹兩種搶佔式算法:

  • 最短剩餘時間優先(SRTF, Shortest Remaining Time First)

調度器會估計每一個進程的運行時間,當一個進程回到就緒隊列,調度器計算這個任務的剩餘處理時間,根據計算結果,放入ready queue中合適的位置。若是該進程的剩餘時間比當前的進程要少,那麼調度器就會搶佔當前運行的任務,讓這個新任務先執行。跟FCFS算法相比,最短剩餘時間的平均等待時間通常比較低。

  • RR(Round Robin)調度器

分時環境特別適合使用RR調度器,即每一個進程都應該獲得處理器時間的一部分。所以,非搶佔式的調度器就不適合這種環境。假設有n個就緒的進程,調度器把CPU資源分紅一個一個時間片,而後分配給各個進程,以下圖所示。就緒隊列裏每一個進程都會獲得處理器的時間片q。當時間片用完了,當前調度的進程會被放入就緒隊列的尾部,造成一個ring。但考慮到在不通進程切換會有開銷,因此選擇時間片q的適合要考慮上下文切換。

寫在後面的話

這篇文章從思考到寫成大概用了1個月的時間,做爲一個硬件工程師寫軟件源碼分析摻入了不少對嵌入式高手來說顯得囉裏吧嗦的話,請大腿們海涵,謹以此文向老貓崇拜的兩位技術偶像致敬,第一位大牛的做品被老貓在2019年反覆拜讀並從中窺探了系統設計的奧妙(瞭解Linux驅動設計,入門了高性能固件的設計思想)。強烈推薦這位偶像的一篇文章理性的賭徒-SSD寫帶寬保持恆穩的祕密 讓老貓感慨大牛把科學工程與技術作成了蒙娜麗莎般高雅的藝術,活出了老貓心目中技術領袖該有的風範。另一位偶像集風騷與技術爲一身,教導老貓理解頂級驗證工程師的發展方向是系統架構工程師,通過了一年半的系統實踐,老貓已在系統之路上略有收穫。向在寫做此文中提供了技術答疑幫助的帕啊哥,大腿馬哥,肌肉強哥表達抱大腿通常的革命感情,向提供了研究項目平臺和實踐機會的唐總表達由衷的感謝。

相關文章
相關標籤/搜索