[自制操做系統] 原子操做&核間中斷&讀寫鎖&PRWLock

本文主要爲讀論文Scalable Read-mostly Synchronization Using Passive Reader-Writer Locks的記錄。
並將其在JOS上實現。其中包括lapic原理,IPI 實現。
本文中支持的新特性:php

  • 支持原子操做
  • 支持讀寫鎖
  • 支持針對單一核心IPI
  • 支持PRWLock

Github : https://github.com/He11oLiu/MOScss

論文閱讀記錄

研究背景

  • 單核性能提高遇到瓶頸 轉向多核提高性能
  • 單核主要爲計算密集型模型,多核主要爲並行化模型
  • 並行化模型面臨的問題:多個處理器共享數據結構,須要同步原語來保證其一致性
  • 更底層:內存一致性 然而會致使可擴展性問題
    • Strict Consistency Memory Barrier
    • Sequential Consistency TSO

可擴展性 Scalable

提到可擴展性,不得不提Amdahl's lawhtml

S(latency)(s) = 1/((1-p)+p/s)

其中1-p極爲不能夠並行的部分,而對於一個處理器,形成(1-P)部分有如下幾種緣由:linux

  • 內存屏障時等待以前的指令執行完
  • MESI模型中等待獲取最新值
  • 等待其餘處理器釋放鎖
  • 多核之間的通信帶寬受限,阻塞

關於讀寫鎖

  • 讀讀不阻塞,讀寫阻塞
  • 適合讀數據頻率遠大於寫數據頻率的應用
  • 單核上的實現思路:讀者鎖不衝突,當須要加寫者鎖的時候須要等全部的讀者鎖釋放。利用一個讀者計數器來實現。
  • 多核上最直觀的實現思路:每一個核上保存本身的讀者鎖,寫者須要等到全部的讀者鎖所有釋放了才能開始獲取鎖。

現有的RWLock所作的嘗試

BRLOCK C-SNZI

  • 讀者申請本身核上的鎖
  • 當只有讀者時,因爲只是訪問本身的核上的鎖,因此有良好的擴展性
  • 寫者須要獲取全部核上的互斥鎖,恆定延遲,與處理器數量有關。
  • SNZI利用樹進行了必定優化

RCU

  • 弱化語義,可能會讀到髒的數據(邏輯矛盾)
  • 讀者無鎖,寫着讀者能夠同時進行
  • 先寫值再改指針
  • 寫者開銷大,要處理舊的數據
  • 垃圾回收(無搶佔調度一圈)

這裏寫圖片描述

Bounded staleness 短內存可見性

所謂短內存可見性,也就是在很短的時間週期內,因爲每一個核上面的單獨cache很是的小,很大概率會被替換掉,從而能看到最新的數據。下面是具體的圖表git

這裏寫圖片描述

PRWLock的設計思路

  • 在短期內各個處理器都可以看到最新的版本號
  • 利用TSO架構的優點,版本控制隱式表示退出CS區域
  • 並不徹底依賴於短期可見,對於特殊狀況,保證一致性,利用IPI要求進行Report IPI消息傳遞開銷較小,且能夠相互掩蓋。
  • 兩種模式 支持調度(睡眠與搶佔)

PRWLock 的兩種模式

Passive Mode

  • 用於處理沒有通過調度的讀者
  • 共享內存陳舊化
  • 弱控制,經過版本控制隱式反饋+少數狀況IPI

Active Mode (用於支持睡眠與調度相似BRLock)

  • 用於處理被調度過的進程(睡眠/搶佔)
  • 經過維護active檢測數量
  • 強控制,主動監聽
  • 主動等待

PRWLock流程視頻:

優酷視頻github

PRWLock的正確性

  • 寫者發現讀者獲取了最新的版本變量時,因爲TSO的特性,也必定看到了寫者上的寫鎖,確信其不會再次進入臨界區。
  • 對於須要較長時間才能看到最新的版本號或沒有讀者指望獲取讀者鎖提供了IPI來下降等待時間,避免無限等待。

PRWLock 內核中減小IPIs

  • 鎖域(Lock Domain)用於表示一個鎖對應的處理器範圍
  • 若上下文切換到了其餘的進程,就不必管這個核的鎖了
  • 鎖域的上下線能夠避免一些沒有必要的一致性檢測
  • 注意利用內存屏障來保證一致性

PRWLock 用戶態實現

因爲在用戶態有如下兩個特色sql

  • 用戶態不能隨時關閉搶佔(Preemption)
  • 用戶態不能發送核間中斷(IPI)

因此PRWLock在用戶態實現的思路以下:shell

  • 利用搶斷標記位避免特殊窗口時被搶斷
  • 寫者必須陷入內核態發送IPI

PRWLock 性能分析

讀者

  • 讀者之間無內存屏障 (無關聯)
  • 鎖域上下線原本就是極少的操做,用來改善性能的,因此其中的內存屏障影響不大
  • 對於長CS區的讀者,與傳統同樣

寫者

  • IPI只要幾百個cycle 自己也要等待
  • 多個寫者能夠直接把鎖傳遞

總結

  • 利用了短內存寫全局可見時間
  • 利用了TSO的特性設計的版本控制來隱式維護語義
  • 利用IPI來保證特殊狀況
  • 利用兩種模式支持調度
  • 讀者之間無關聯(內存屏障),提高讀者性能
  • PWAKE 分佈式喚醒,提升了喚醒並行性

移植PRWLockJOS

JOS的核間中斷實現

關於Local APIC

在一個基於APIC的系統中,每個核心都有一個Local APICLocal APIC負責處理CPU中特定的中斷配置。還有其餘事情,它包含了Local Vector Table(LVT)負責配置事件中斷。api

此外,還有一個CPU外面的IO APIC(例如Intel82093AA)的芯片組,而且提供基於多處理器的中斷管理,在多個處理器之間,實現靜態或動態的中斷觸發路由。數組

Inter-Processor Interrupts(IPIs)是一種由Local APIC觸發的中斷,通常能夠用於多CPU間調度之類的使用

想要開啓Local APIC接收中斷,則須要設置Spurious Interrupt Vector Register的第8位便可。

使用APIC Timer的最大好處是每一個cpu內核都有一個定時器。相反PIT(Programmable Interval Timer)就不這樣,PIT是共用的一個。

  • 週期觸發模式

    週期觸發模式中,程序設置一個」初始計數「寄存器(Initial Count),同時Local APIC會將這個數複製到」當前計數「寄存器(Current Count)。Local APIC會將這個數(當前計數)遞減,直到減到0爲止,這時候將觸發一個IRQ(能夠理解爲觸發一次中斷),與此同時將當前計數恢復到初始計數寄存器的值,而後周而復始的執行上述邏輯。可使用這種方法經過Local APIC實現定時按照必定時間間隔觸發中斷的功能。

  • 一次性觸發模式

    同以前同樣,可是不會恢復到初始計數。

  • TSC-Deadline Modie

    cpu的時間戳到達deadline的時候會觸發IRQ

來源Blog

每一個本地APIC都有 32 位的寄存器,一個內部時鐘,一個本地定時設備以及爲本地中斷保留的兩條額外的 IRQ 線 LINT0 和 LINT1。全部本地 APIC 都鏈接到 I/O APIC,造成一個多級 APIC 系統。

Intel x86架構提供LINT0和LINT1兩個中斷引腳,他們一般與Local APIC相連,用於接收Local APIC傳遞的中斷信號,另外,當Local APIC被禁用的時候,LINT0和LINT1即被配置爲INTR和NMI管腳,即外部IO中斷管腳和非屏蔽中斷管腳。

來源博客

這裏寫圖片描述

The local APIC registers are memory mapped to an address that can be found in the MP/MADT tables. Make sure you map these to virtual memory if you are using paging. Each register is 32 bits long, and expects to written and read as a 32 bit integer. Although each register is 4 bytes, they are all aligned on a 16 byte boundary.

再一次詳細查看JOS中的核間中斷的實現方式,

因爲這段映射,設置了nocache和直寫的特性,便於對於IO的操做。

void lapic_init(void)
{
    if (!lapicaddr)
        return;
    // lapicaddr is the physical address of the LAPIC's 4K MMIO
    // region. Map it in to virtual memory so we can access it.
    lapic = mmio_map_region(lapicaddr, 4096);
    // Enable local APIC; set spurious interrupt vector.
    lapicw(SVR, ENABLE | (IRQ_OFFSET + IRQ_SPURIOUS));

    // The timer repeatedly counts down at bus frequency
    // from lapic[TICR] and then issues an interrupt.
    // If we cared more about precise timekeeping,
    // TICR would be calibrated using an external time source.
    lapicw(TDCR, X1);
    lapicw(TIMER, PERIODIC | (IRQ_OFFSET + IRQ_TIMER));
    lapicw(TICR, 10000000);

    // Leave LINT0 of the BSP enabled so that it can get
    // interrupts from the 8259A chip.
    //
    // According to Intel MP Specification, the BIOS should initialize
    // BSP's local APIC in Virtual Wire Mode, in which 8259A's
    // INTR is virtually connected to BSP's LINTIN0. In this mode,
    // we do not need to program the IOAPIC.
    if (thiscpu != bootcpu)
        lapicw(LINT0, MASKED);

    // Disable NMI (LINT1) on all CPUs
    lapicw(LINT1, MASKED);

    // Disable performance counter overflow interrupts
    // on machines that provide that interrupt entry.
    if (((lapic[VER] >> 16) & 0xFF) >= 4)
        lapicw(PCINT, MASKED);

    // Map error interrupt to IRQ_ERROR.
    lapicw(ERROR, IRQ_OFFSET + IRQ_ERROR);

    // Clear error status register (requires back-to-back writes).
    lapicw(ESR, 0);
    lapicw(ESR, 0);

    // Ack any outstanding interrupts.
    lapicw(EOI, 0);

    // Send an Init Level De-Assert to synchronize arbitration ID's.
    lapicw(ICRHI, 0);
    lapicw(ICRLO, BCAST | INIT | LEVEL);
    while (lapic[ICRLO] & DELIVS)
        ;

    // Enable interrupts on the APIC (but not on the processor).
    lapicw(TPR, 0);
}

lapic_initLAPIC映射到lapicaddr地址上,而且初始化LAPIC各類中斷參數。

// Local APIC registers, divided by 4 for use as uint32_t[] indices.
#define ID (0x0020 / 4) // ID

這裏的宏定義爲/4是由於MMIO映射到MMIOaddr,保存在volatile uint32_t *lapic;中。這個單位是uint32_t,故全部的地址均/4

下面來看一下主要的APIC Registers

  • EOI Register

    Write to the register with offset 0xB0 using the value 0 to signal an end of interrupt. A non-zero values causes a general protection fault.

    #define EOI (0x00B0 / 4) // EOI
    
    // Acknowledge interrupt.
    void lapic_eoi(void)
    {
    if (lapic)
        lapicw(EOI, 0);
    }
  • Local Vector Table Registers

    There are some special interrupts that the processor and LAPIC can generate themselves. While external interrupts are configured in the I/O APIC, these interrupts must be configured using registers in the LAPIC. The most interesting registers are:

    0x320 = lapic timer

    0x350 = lint0

    0x360 = lint1

    JOS在這裏只保留了BSPLINT0用於接受8259A的中斷,其餘的LINT0LINT1非屏蔽中斷,均設置爲MASKED

    // Leave LINT0 of the BSP enabled so that it can get
    // interrupts from the 8259A chip.
    //
    // According to Intel MP Specification, the BIOS should initialize
    // BSP's local APIC in Virtual Wire Mode, in which 8259A's
    // INTR is virtually connected to BSP's LINTIN0. In this mode,
    // we do not need to program the IOAPIC.
    if (thiscpu != bootcpu)
        lapicw(LINT0, MASKED);
    
    // Disable NMI (LINT1) on all CPUs
    lapicw(LINT1, MASKED);
  • Spurious Interrupt Vector Register

    The offset is 0xF0. The low byte contains the number of the spurious interrupt. As noted above, you should probably set this to 0xFF. To enable the APIC, set bit 8 (or 0x100) of this register. If bit 12 is set then EOI messages will not be broadcast. All the other bits are currently reserved.

    // Enable local APIC; set spurious interrupt vector.
    lapicw(SVR, ENABLE | (IRQ_OFFSET + IRQ_SPURIOUS));
  • Interrupt Command Register

    The interrupt command register is made of two 32-bit registers; one at 0x300 and the other at 0x310.

    #define ICRHI (0x0310 / 4) // Interrupt Command [63:32]
    
    
    #define ICRLO (0x0300 / 4) // Interrupt Command [31:0]

    It is used for sending interrupts to different processors.

    The interrupt is issued when 0x300 is written to, but not when 0x310 is written to. Thus, to send an interrupt command one should first write to 0x310, then to 0x300.

    須要先寫ICRHI,而後在寫ICRLO的時候就會產生中斷。

    At 0x310 there is one field at bits 24-27, which is local APIC ID of the target processor (for a physical destination mode).

    lapicw(ICRHI, apicid << 24);

    ICRHI中斷目標核心的local APIC ID。這裏的apicid是在MP Floating Pointer Structure讀的時候順序給的cpu_id

    ICRLO的分佈比較重要

    • 其中目標模式有(8-10)
    #define INIT 0x00000500 // INIT/RESET
    
    
    #define STARTUP 0x00000600 // Startup IPI
    • 其中發送模式有(18~19)
    #define SELF 0x00040000 // Send to self
    
    
    #define BCAST 0x00080000 // Send to all APICs, including self.
    
    
    #define OTHERS 0x000C0000 // Send to all APICs, excluding self.

    不設置的話則爲發送給0x310 ICRHI制定的核心。

綜上,打包了一個IPI發送的接口,

void lapic_ipi(int vector)
{
    lapicw(ICRLO, OTHERS | FIXED | vector);
    while (lapic[ICRLO] & DELIVS)
        ;
}

用於發送IPIIPI ACK均是利用MMIO直接對相應地址書寫,比較簡單。

這裏測試一下,先設置trap中的IPI中斷

#define T_PRWIPI 20 // IPI report for PRWLock
void prw_ipi_report(struct Trapframe *tf)
{
    cprintf("%d in ipi report\n",cpunum());
}

trap_dispatch中加入對這個中斷的分發

case T_PRWIPI:
        prw_ipi_report(tf);
        break;

最後在init的時候用bsp發送IPI給全部其餘核心

lapic_ipi(T_PRWIPI);

設置QEMU模擬4個核心來測試IPI是否正確

1 in ipi report
3 in ipi report
2 in ipi report

BSP能夠正確的接受IPI並進入中斷處理歷程。

JOS實現傳統內核態讀寫鎖

typedef struct dumbrwlock {
    struct spinlock lock;
    atomic_t readers;
}dumbrwlock;

void rw_initlock(dumbrwlock *rwlk)
{
    spin_initlock(&rwlk->lock);
    rwlk->readers.counter = 0;
}

void dumb_wrlock(dumbrwlock *rwlk)
{
    spin_lock(&rwlk->lock);
    while (rwlk->readers.counter > 0)
        asm volatile("pause");
}

void dumb_wrunlock(dumbrwlock *rwlk)
{
    spin_unlock(&rwlk->lock);
}

void dumb_rdlock(dumbrwlock *rwlk)
{
    while (1)
    {
        atomic_inc(&rwlk->readers);
        if (!rwlk->lock.locked)
            return;
        atomic_dec(&rwlk->readers);
        while (rwlk->lock.locked)
            asm volatile("pause");
    }
}

void dumb_rdunlock(dumbrwlock *rwlk)
{
    atomic_dec(&rwlk->readers);
}

而後發現一個比較大的問題,JOS沒有實現原子操做,先實現原子操做再進行下面的嘗試。

JOS 實現原子操做

仿造linux 2.6內核,實現原子操做

#ifndef JOS_INC_ATOMIC_H_
#define JOS_INC_ATOMIC_H_

/* * Atomic operations that C can't guarantee us. Useful for * resource counting etc.. */

#include <inc/types.h>

#define LOCK "lock ; "

/* * Make sure gcc doesn't try to be clever and move things around * on us. We need to use _exactly_ the address the user gave us, * not some alias that contains the same information. */
typedef struct
{
    volatile int counter;
} atomic_t;

#define ATOMIC_INIT(i) \
    {                  \
        (i)            \
    }

/** * atomic_read - read atomic variable * @v: pointer of type atomic_t * * Atomically reads the value of @v. */
#define atomic_read(v) ((v)->counter)

/** * atomic_set - set atomic variable * @v: pointer of type atomic_t * @i: required value * * Atomically sets the value of @v to @i. */
#define atomic_set(v, i) (((v)->counter) = (i))

/** * atomic_add - add integer to atomic variable * @i: integer value to add * @v: pointer of type atomic_t * * Atomically adds @i to @v. */
static __inline__ void atomic_add(int i, atomic_t *v)
{
    __asm__ __volatile__(
        LOCK "addl %1,%0"
        : "=m"(v->counter)
        : "ir"(i), "m"(v->counter));
}

/** * atomic_sub - subtract the atomic variable * @i: integer value to subtract * @v: pointer of type atomic_t * * Atomically subtracts @i from @v. */
static __inline__ void atomic_sub(int i, atomic_t *v)
{
    __asm__ __volatile__(
        LOCK "subl %1,%0"
        : "=m"(v->counter)
        : "ir"(i), "m"(v->counter));
}

/** * atomic_sub_and_test - subtract value from variable and test result * @i: integer value to subtract * @v: pointer of type atomic_t * * Atomically subtracts @i from @v and returns * true if the result is zero, or false for all * other cases. */
static __inline__ int atomic_sub_and_test(int i, atomic_t *v)
{
    unsigned char c;

    __asm__ __volatile__(
        LOCK "subl %2,%0; sete %1"
        : "=m"(v->counter), "=qm"(c)
        : "ir"(i), "m"(v->counter)
        : "memory");
    return c;
}

/** * atomic_inc - increment atomic variable * @v: pointer of type atomic_t * * Atomically increments @v by 1. */
static __inline__ void atomic_inc(atomic_t *v)
{
    __asm__ __volatile__(
        LOCK "incl %0"
        : "=m"(v->counter)
        : "m"(v->counter));
}

/** * atomic_dec - decrement atomic variable * @v: pointer of type atomic_t * * Atomically decrements @v by 1. */
static __inline__ void atomic_dec(atomic_t *v)
{
    __asm__ __volatile__(
        LOCK "decl %0"
        : "=m"(v->counter)
        : "m"(v->counter));
}

/** * atomic_dec_and_test - decrement and test * @v: pointer of type atomic_t * * Atomically decrements @v by 1 and * returns true if the result is 0, or false for all other * cases. */
static __inline__ int atomic_dec_and_test(atomic_t *v)
{
    unsigned char c;

    __asm__ __volatile__(
        LOCK "decl %0; sete %1"
        : "=m"(v->counter), "=qm"(c)
        : "m"(v->counter)
        : "memory");
    return c != 0;
}

/** * atomic_inc_and_test - increment and test * @v: pointer of type atomic_t * * Atomically increments @v by 1 * and returns true if the result is zero, or false for all * other cases. */
static __inline__ int atomic_inc_and_test(atomic_t *v)
{
    unsigned char c;

    __asm__ __volatile__(
        LOCK "incl %0; sete %1"
        : "=m"(v->counter), "=qm"(c)
        : "m"(v->counter)
        : "memory");
    return c != 0;
}

/** * atomic_add_negative - add and test if negative * @v: pointer of type atomic_t * @i: integer value to add * * Atomically adds @i to @v and returns true * if the result is negative, or false when * result is greater than or equal to zero. */
static __inline__ int atomic_add_negative(int i, atomic_t *v)
{
    unsigned char c;

    __asm__ __volatile__(
        LOCK "addl %2,%0; sets %1"
        : "=m"(v->counter), "=qm"(c)
        : "ir"(i), "m"(v->counter)
        : "memory");
    return c;
}

/** * atomic_add_return - add and return * @v: pointer of type atomic_t * @i: integer value to add * * Atomically adds @i to @v and returns @i + @v */
static __inline__ int atomic_add_return(int i, atomic_t *v)
{
    int __i;
    /* Modern 486+ processor */
    __i = i;
    __asm__ __volatile__(
        LOCK "xaddl %0, %1;"
        : "=r"(i)
        : "m"(v->counter), "0"(i));
    return i + __i;
}

static __inline__ int atomic_sub_return(int i, atomic_t *v)
{
    return atomic_add_return(-i, v);
}

#define atomic_inc_return(v) (atomic_add_return(1, v))
#define atomic_dec_return(v) (atomic_sub_return(1, v))

/* These are x86-specific, used by some header files */
#define atomic_clear_mask(mask, addr) \
    __asm__ __volatile__(LOCK "andl %0,%1"          \
                         :                          \
                         : "r"(~(mask)), "m"(*addr) \
                         : "memory")

#define atomic_set_mask(mask, addr) \
    __asm__ __volatile__(LOCK "orl %0,%1"          \
                         :                         \
                         : "r"(mask), "m"(*(addr)) \
                         : "memory")

#endif

而後在內核中對讀寫鎖的功能進行測試。

遇到兩個問題

  • 一個是asm volatile("pause");容易死在那個循環裏面,不會從新換到這個CPU中,在DEBUG的時候發如今先後加上cprintf其就會順利換回來。

    while (rwlk->lock.locked)
          {
              cprintf("");
              asm volatile("pause");
          }
  • 另外一個是設計內核中的測試

    • 多核上的輸出可能會並行化,要減短輸出內容。
    • 在用戶空間的鎖分享目前很差作,linux是基於文件的。
    • 故設計了兩個鎖來進行測試

    一個是CPU 0writer鎖,一個是reader鎖。

    // test reader-writer lock
    rw_initlock(&lock1);
    rw_initlock(&lock2);
    
    dumb_wrlock(&lock1);
    cprintf("[rw] CPU %d gain writer lock1\n", cpunum());
    dumb_rdlock(&lock2);
    cprintf("[rw] CPU %d gain reader lock2\n", cpunum());
    
    // Starting non-boot CPUs
    boot_aps();
    
    cprintf("[rw] CPU %d going to release writer lock1\n", cpunum());
    dumb_wrunlock(&lock1);  
    cprintf("[rw] CPU %d going to release reader lock2\n", cpunum());
    dumb_rdunlock(&lock2);

    對於每一個核上,分別獲取lock1的讀着鎖與lock2的寫者鎖。添加asm volatile("pause");是想讓其餘核模擬上線來檢測各類狀況。

    dumb_rdlock(&lock1);
    cprintf("[rw] %d l1\n", cpunum());
    asm volatile("pause");
    dumb_rdunlock(&lock1);
    cprintf("[rw] %d unl1\n", cpunum());
    
    dumb_wrlock(&lock2);
    cprintf("[rw] %d l2\n", cpunum());
    asm volatile("pause");
    cprintf("[rw] %d unl2\n", cpunum());
    dumb_wrunlock(&lock2);

    在給QEMU四核參數CPUS=4的時候下的運行狀況以下:

[rw] CPU 0 gain writer lock1
[rw] CPU 0 gain reader lock2
[MP] CPU 1 starting
[MP] CPU 2 starting
[MP] CPU 3 starting
[rw] CPU 0 going to release writer lock1
[rw] CPU 0 going to release reader lock2
[rw] 1 l1
[rw] 2 l1
[rw] 3 l1
[rw] 2 unl1
[rw] 2 l2
[rw] 3 unl1
[rw] 1 unl1
[rw] 2 unl2
[MP] CPU 2 sched
[rw] 3 l2
[rw] 3 unl2
[rw] 1 l2
[MP] CPU 3 sched
[rw] 1 unl2
[MP] CPU 1 sched

能夠觀察到一旦CPU0釋放了lock1的寫者鎖,全部的核都可以得到lock1的讀者鎖。然後CPU2得到了lock2的寫者鎖後,其餘核上線,CPU3CPU1只是釋放了lock1,沒法得到lock2,只有等CPU2釋放了lock2才能獲取。

這與指望的讀寫鎖的功能是一致的。至此普通讀寫鎖的實現完成。

JOS實現PRWLock

首先有幾個重點:

  • PRWLock數據結構設計
  • 鎖的具體實現
  • 調度時調用內容
  • PRWLock的測試

PRWLock的數據結構

enum lock_status
{
    FREE = 0,
    LOCKED,
    PASS,
    PASSIVE
};

struct percpu_prwlock
{
    enum lock_status reader;
    atomic_t version;
};

typedef struct prwlock
{
    enum lock_status writer;
    struct percpu_prwlock lockinfo[NCPU];
    atomic_t active;
    atomic_t version;
} prwlock;

對於一個prwlock,除了其主要的版本以及ACTIVE的讀者數量,還須要保存每一個核心持有該鎖的版本號,以及每一個核上該鎖的讀者狀態。這裏直接經過lockinfo數組索引每一個核對應的該鎖信息。

而全局內核所擁有的讀寫鎖經過locklist進行索引,在init的時候加入到這個list中去。

extern unsigned int prwlocknum;
extern prwlock *locklist[MAXPRWLock];

鎖的具體操做

初始化操做的時候須要設置各類初值,並將其添加到list

void prw_initlock(prwlock *rwlk)
{
    int i = 0;
    rwlk->writer = FREE;
    for (i = 0; i < NCPU; i++)
    {
        rwlk->lockinfo[i].reader = FREE;
        atomic_set(&rwlk->lockinfo[i].version, 0);
    }
    atomic_set(&rwlk->active, 0);
    atomic_set(&rwlk->version, 0);
    locklist[prwlocknum++] = rwlk;
}

剩下的與論文中僞代碼的實現思路相同,只是具體調用的函數有一些差異。

讀者鎖中包括向核心發送ipi。這裏只是示意,就沒有寫PASS的具體部分,能夠經過添加一個等待標誌變量來實現。

void prw_wrlock(prwlock *rwlk)
{
    int newVersion;
    int id = 0;
    unsigned int corewait = 0;
    if (rwlk->writer == PASS)
        return;
    rwlk->writer = LOCKED;
    newVersion = atomic_inc_return(&rwlk->version);
    for (id = 0; id < ncpu; id++)
    {
#ifdef TESTPRW
        cprintf("CPU %d Ver %d\n", id, atomic_read(&rwlk->lockinfo[id].version));
#endif
        if (id != cpunum() && atomic_read(&rwlk->lockinfo[id].version) != newVersion)
        {
            lapic_ipi_dest(id, PRWIPI);
            corewait |= binlist[id];
#ifdef TESTPRW
            cprintf("send ipi %d\n", id);
#endif
        }
    }
    for (id = 0; id < ncpu; id++)
    {
        if (corewait & binlist[id])
        {
            while (atomic_read(&rwlk->lockinfo[id].version) != newVersion)
                asm volatile("pause");
        }
    }
    while (atomic_read(&rwlk->active) != 0)
    {
        lock_kernel();
        sched_yield();
    }
}

void prw_wrunlock(prwlock *rwlk)
{
    // if someone waiting to gain write lock rwlk->writer should be PASS
    rwlk->writer = FREE;
}

void prw_rdlock(prwlock *rwlk)
{
    struct percpu_prwlock *st;
    int lockversion;
    st = &rwlk->lockinfo[cpunum()];
    st->reader = PASSIVE;
    while (rwlk->writer != FREE)
    {
        st->reader = FREE;
        lockversion = atomic_read(&rwlk->version);
        atomic_set(&st->version, lockversion);
        while (rwlk->writer != FREE)
            asm volatile("pause");
        st = &rwlk->lockinfo[cpunum()];
        st->reader = PASSIVE;
    }
}

void prw_rdunlock(prwlock *rwlk)
{
    struct percpu_prwlock *st;
    int lockversion;
    st = &rwlk->lockinfo[cpunum()];
    if (st->reader == PASSIVE)
        st->reader = FREE;
    else
        atomic_dec(&rwlk->active);
    lockversion = atomic_read(&rwlk->version);
    atomic_set(&st->version, lockversion);
}

每一個核心接到PRWIPI的處理函數

void prw_ipi_report(struct Trapframe *tf)
{
    int lockversion, i;
    struct percpu_prwlock *st;
    cprintf("In IPI_report CPU %d\n", cpunum());
    for (i = 0; i < prwlocknum; i++)
    {
        st = &locklist[i]->lockinfo[cpunum()];
        if (st->reader != PASSIVE)
        {
            lockversion = atomic_read(&locklist[i]->version);
            atomic_set(&st->version, lockversion);
        }
    }
}

調度時調用內容

調度時須要將全部的鎖均進行處理,因此要遍歷locklist

// Implement PRWLock
    if (prwlocknum != 0)
        for (j = 0; j < prwlocknum; j++)
            prw_sched(locklist[j]);

具體的prw_sched以下:

void prw_sched(prwlock *rwlk)
{
    struct percpu_prwlock *st;
    int lockversion;
    st = &rwlk->lockinfo[cpunum()];
    if (st->reader == PASSIVE)
    {
        atomic_inc(&rwlk->active);
        st->reader = FREE;
    }
    lockversion = atomic_read(&rwlk->version);
    atomic_set(&st->version, lockversion);
}

PRWLock的測試

測試PRWLock也比較複雜,因爲咱們使用的是big kernel lock,因此內核態裏面很差測試,直接在初始化開始RR以前測試。這裏引入一個新的IPI進行測試。

void prw_debug(struct Trapframe *tf)
{
    int needlock = 0;
    cprintf("====CPU %d in prw debug====\n",cpunum());
    if(kernel_lock.cpu == thiscpu && kernel_lock.locked == 1)
    {
        unlock_kernel();
        needlock = 1;
    }
    prw_wrlock(&lock1);
    cprintf("====%d gain lock1====\n",cpunum());
    prw_wrunlock(&lock1);
    cprintf("====%d release lock1====\n",cpunum());
    if(needlock)
        lock_kernel();
}

給一個核心發送DEBUGPRW中斷,即讓其獲取lock1的寫者鎖。

#ifdef TESTPRW
    unlock_kernel();
    prw_initlock(&lock1);
    prw_wrlock(&lock1);
    prw_wrunlock(&lock1);
    prw_rdlock(&lock1);
    cprintf("====%d Gain Reader Lock====\n", cpunum());
    lapic_ipi_dest(3, DEBUGPRW);
    for (int i = 0; i < 10000; i++)
        asm volatile("pause");
    prw_rdunlock(&lock1);
    cprintf("====%d release Reader Lock====\n", cpunum());
    lock_kernel();
#endif 

這裏先用unlock_kernel,避免其餘核心沒法接收中斷,最後再lock_kernel,才能開始sched

測試選擇6個核心

SMP: CPU 0 found 6 CPU(s)
enabled interrupts: 1 2 4
[MP] CPU 1 starting
[MP] CPU 2 starting
[MP] CPU 3 starting
[MP] CPU 4 starting
[MP] CPU 5 starting
[MP] CPU 1 sched
[MP] CPU 2 sched
[MP] CPU 3 sched
[MP] CPU 4 sched
[MP] CPU 5 sched
CPU 0 Ver 0
CPU 1 Ver 0
send ipi 1
CPU 2 Ver 0
send ipi 2
CPU 3 Ver 0
send ipi 3
CPU 4 Ver 0
send ipi 4
CPU 5 Ver 0
====0 Gain Reader Lock==== In IPI_report CPU 1 In IPI_report CPU 2 In IPI_report CPU 4 FS is running ====CPU 3 in prw debug==== FS can do I/O CPU 0 Ver 0 Dsend ipi 0 evice 1 presence: 1 CPU 1 Ver 1 send ipi 1 CPU 2 Ver 2 CPU 3 Ver 1 CPU 4 Ver 1 send ipi 4 CPU 5 Ver 1 send ipi 5 In IPI_report CPU 5 $ block cache is good superblock is good bitmap is good alloc_block is good file_open is good file_get_block is good file_flush is good file_truncate is good file rewrite is good ====0 release Reader Lock==== Init finish! Sched start... ====3 gain lock1==== ====3 release lock1====

CPU0釋放了讀着鎖以後,CPU3纔可以獲取lock1,測試正確

相關文章
相關標籤/搜索