@node
在早期的 Linux內核中,併發的來源相對較少。早期內核不支持對稱多處理( symmetric multi processing,SMP),所以,致使併發執行的惟一緣由是對硬件中斷的服務。這種狀況處理起來較爲簡單,但並不適用於爲得到更好的性能而使用更多處理器且強調快速響應事件的系統。編程
爲了響應現代硬件和應用程序的需求, Linux內核已經發展到同時處理更多事情的時代。Linux系統是個多任務操做系統,會存在多個任務同時訪問同一片內存區域的狀況,這些任務可能會相互覆蓋這段內存中的數據,形成內存數據混亂。針對這個問題必需要作處理,嚴重的話可能會致使系統崩潰。如今的 Linux系統併發產生的緣由很複雜,總結一下有下面幾個主要緣由:數據結構
只要咱們的程序在運轉當中,就有可能發生併發和競爭。好比,當兩個執行線程須要訪問相同的數據結構(或硬件資源)時,混合的可能性就永遠存在。所以在設計本身的驅動程序時,就應該避免資源的共享。若是沒有併發的訪問,也就不會有競態的產生。所以,仔細編寫的內核代碼應該具備最少的共享。這種思想的最明顯應用就是避免使用全局變量。若是咱們將資源放在多個執行線程都會找到的地方(臨界區),則必須有足夠的理由。多線程
如何防止咱們的數據被併發訪問呢?這個時候就要創建一種保護機制,下面介紹幾種內核提供的幾種併發和競爭的處理方法。架構
原子,在早接觸到是在化學概念中。原子指化學反應不可再分的基本微粒。一樣的,在內核中所說的原子操做表示這一個訪問是一個步驟,必須一次性執行完,不能被打斷,不能再進行拆分。
例如,在多線程訪問中,咱們的線程一對a進行賦值操做,a=1
,線程二也對a進行賦值操做a=2
,咱們理想的執行順序是線程一先執行,線程二再執行。可是頗有可能在線程一執行的時候被其餘操做打斷,使得線程一最後的執行結果變爲a=2
。要解決這個問題,必須保證咱們的線程一在對數據訪問的過程當中不能被其餘的操做打斷,一次性執行完成。併發
函數 | 描述 |
---|---|
ATOMIC_INIT(int i) | 定義原子變量的時候對其初始化。 |
int atomic_read(atomic_t*v) | 讀取 v的值,而且返回 |
void atomic_set(atomic_t *v, int i) | 向 v寫入 i值。 |
void atomic_add(int i, atomic_t *v) | 給 v加上 i值。 |
void atomic_sub(int i, atomic_t *v) | 從 v減去 i值。 |
void atomic_inc(atomic_t *v) | 給 v加 1,也就是自增。 |
void atomic_dec(atomic_t *v) | 從 v減 1,也就是自減 。 |
int atomic_dec_return(atomic_t *v) | 從 v減 1,而且返回v的值 。 |
int atomic_inc_return(atomic_t *v) | 給 v加 1,而且返回 v的值。 |
int atomic_sub_and_test(int i, atomic_t *v) | 從 v減 i,若是結果爲0就返回真,不然就返回假 |
int atomic_dec_and_test(atomic_t *v) | 從 v減 1,若是結果爲0就返回真,不然就返回假 |
int atomic_inc_and_test(atomic_t *v) | 給 v加 1,若是結果爲0就返回真,不然就返回假 |
int atomic_add_negative(int i, atomic_t *v) | 給 v加 i,若是結果爲負就返回真,不然返回假 |
注:64位的整型原子操做只是將「atomic_」前綴換成「atomic64_」,將int換成long long。異步
函數 | 描述 |
---|---|
void set_bit(int nr, void *p) | 將p地址的nr位置1 |
void clear_bit(int nr,void *p) | 將p地址的nr位清零 |
void change_bit(int nr, void *p) | 將p地址的nr位反轉 |
int test_bit(int nr, void *p) | 獲取p地址的nr位的值 |
int test_and_set_bit(int nr, void *p) | 將p地址的nr位置1,而且返回nr位原來的值 |
int test_and_clear_bit(int nr, void *p) | 將p地址的nr位清0,而且返回nr位原來的值 |
int test_and_change_bit(int nr, void *p) | 將p地址的nr位翻轉,而且返回nr位原來的值 |
/* 定義原子變量,初值爲1*/ static atomic_t xxx_available = ATOMIC_INIT(1); static int xxx_open(struct inode *inode, struct file *filp) { ... /* 經過判斷原子變量的值來檢查LED有沒有被別的應用使用 */ if (!atomic_dec_and_test(&xxx_available)) { /*小於0的話就加1,使其原子變量等於0*/ atomic_inc(&xxx_available); /* LED被使用,返回忙*/ return - EBUSY; } ... /* 成功 */ return 0; static int xxx_release(struct inode *inode, struct file *filp) { /* 關閉驅動文件的時候釋放原子變量 */ atomic_inc(&xxx_available); return 0; }
上面咱們介紹了原子變量,從它的操做函數能夠看出,原子操做只能針對整型變量或者位。假如咱們有一個結構體變量須要被線程A所訪問,在線程A訪問期間不能被其餘線程訪問,這怎麼辦呢?自旋鎖就能夠完成對結構體變量的保護。函數
自旋鎖,顧名思義,咱們能夠把他理解成廁所門上的一把鎖。這個廁所門只有一把鑰匙,當咱們進去時,把鑰匙取下來,進去後反鎖。那麼當第二我的想進來,必須等咱們出去後才能夠。當第二我的在外面等待時,可能會一直等待在門口轉圈。性能
咱們的自旋鎖也是這樣,自旋鎖只有鎖定和解鎖兩個狀態。當咱們進入拿上鑰匙進入廁所,這就至關於自旋鎖鎖定的狀態,期間誰也不能夠進來。當第二我的想要進來,這至關於線程B想要訪問這個共享資源,可是目前不能訪問,因此線程B就一直在原地等待,一直查詢是否能夠訪問這個共享資源。當咱們從廁所出來後,這個時候就「解鎖」了,只有再這個時候線程B才能訪問。優化
假如,在廁所的人待的時間太長怎麼辦?外面的人一直等待嗎?若是換作是咱們,確定不會這樣,簡直浪費時間,可能咱們會尋找其餘方法解決問題。自旋鎖也是這樣的,若是線程A持有自旋鎖時間過長,顯然會浪費處理器的時間,下降了系統性能。咱們知道CPU最偉大的發明就在於多線程操做,這個時候讓線程B在這裏傻傻的不知道還要等待多久,顯然是不合理的。所以,若是自旋鎖只適合短時間持有,若是遇到須要長時間持有的狀況,咱們就要換一種方式了(下文的互斥體)。
函數 | 描述 |
---|---|
DEFINE_SPINLOCK(spinlock_t lock) | 定義並初始化一個自旋變量 |
int spin_lock_init(spinlock_t *lock) | 初始化自旋鎖 |
void spin_lock(spinlock_t *lock) | 獲取指定的自旋鎖,也叫加鎖 |
void spin_unlock(spinlock_t *lock) | 釋放指定的自旋鎖。 |
int spin_trylock(spinlock_t *lock) | 嘗試獲取指定的鎖,若是沒有獲取到,返回0 |
int spin_is_locked(spinlock_t *lock) | 檢查指定的自旋鎖是否被獲取,若是沒有被獲取返回非0,不然返回0. |
自旋鎖是主要爲了多處理器系統設計的。對於單處理器且內核不支持搶佔的系統,一旦進入了自旋狀態,則會永遠自旋下去。由於,沒有任何線程能夠獲取CPU來釋放這個鎖。所以,在單處理器且內核不支持搶佔的系統中,自旋鎖會被設置爲空操做。
以上列表中的函數適用於SMP或支持搶佔的單CPU下線程之間的併發訪問,也就是用於線程與線程之間,被自旋鎖保護的臨界區必定不能調用任何可以引發睡眠和阻塞(其實本質仍然是睡眠)的API函數,不然的話會可能會致使死鎖現象的發生。自旋鎖會自動禁止搶佔,也就說當線程A獲得鎖之後會暫時禁止內核搶佔。若是線程A在持有鎖期間進入了休眠狀態,那麼線程A會自動放棄CPU使用權。線程B開始運行,線程B也想要獲取鎖,可是此時鎖被A線程持有,並且內核搶佔還被禁止了!線程B沒法被調度岀去,那麼線程A就沒法運行,鎖也就沒法釋放死鎖發生了!
當線程之間發生併發訪問時,若是此時中斷也要插一腳,中斷也想訪問共享資源,那該怎麼辦呢?首先能夠確定的是,中斷裏面使用自旋鎖,可是在中斷裏面使用自旋鎖的時候,在獲取鎖以前必定要先禁止本地中斷(也就是本CPU中斷,對於多核SOC來講會有多個CPU核),不然可能致使鎖死現象的發生。看下下面一個例子:
//線程A spin_lock(&lock); ....... functionA(); ....... spin_unlock(&lock); //中斷髮生,運行線程B spin_lock(&lock); ....... functionA(); ....... spin_unlock(&lock);
線程A先運行,而且獲取到了lock這個鎖,當線程A運行 functionA函數的時候中斷髮生了,中斷搶走了CPU使用權。下邊的中斷服務函數也要獲取lock這個鎖,可是這個鎖被線程A佔有着,中斷就會一直自旋,等待鎖有效。可是在中斷服務函數執行完以前,線程A是不可能執行的,線程A說「你先放手」,中斷說「你先放手」,場面就這麼僵持着死鎖發生!
使用了自旋鎖以後能夠保證臨界區不受別的CPU和本CPU內的搶佔進程的打擾,可是獲得鎖的代碼在執行臨界區的時候,還可能受到中斷和底半部的影響,爲了防止這種影響,建議使用如下列表中的函數:
函數 | 描述 |
---|---|
void spin_lock_irq(spinlock_t *lock) | 禁止本地中斷,並獲取自旋鎖 |
void spin_unlock_irq(spinlock_t *lock) | 激活本地中斷,並釋放自旋鎖 |
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags) | 保存中斷狀態,禁止本地中斷,並獲取自旋鎖 |
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags) | 將中斷狀態恢復到之前的狀態,而且激活本地中斷,釋放自旋鎖 |
在多核編程的時候,若是進程和中斷可能訪問同一片臨界資源,咱們通常須要在進程上下文中調用spin_ lock irqsave() spin_unlock_irqrestore(),在中斷上下文中調用 spin_lock() spin _unlock()。這樣,在CPU上,不管是進程上下文,仍是中斷上下文得到了自旋鎖,此後,若是CPU1不管是進程上下文,仍是中斷上下文,想得到同一自旋鎖,都必須忙等待,這避免一切核間併發的可能性。同時,因爲每一個核的進程上下文持有鎖的時候用的是 spin_lock_irgsave(),因此該核上的中斷是不可能進入的,這避免了核內併發的可能性。
DEFINE_SPINLOCK(lock) /* 定義並初始化一個鎖 */ /* 線程A */ void functionA (){ unsigned long flags; /* 中斷狀態 */ spin_lock_irqsave(&lock, flags) /* 獲取鎖 */ /* 臨界區 */ spin_unlock_irqrestore(&lock, flags) /* 釋放鎖 */ } /* 中斷服務函數 */ void irq() { spin_lock(&lock) /* 獲取鎖 */ /* 臨界區 */ spin_unlock(&lock) /* 釋放鎖 */ }
static int xxx_open(struct inode *inode, struct file *filp) { ... spinlock(&xxx_lock); if (xxx_count) {/* 已經打開*/ spin_unlock(&xxx_lock); return -EBUSY; } xxx_count++;/* 增長使用計數*/ spin_unlock(&xxx_lock); ... return 0;/* 成功 */ } static int xxx_release(struct inode *inode, struct file *filp) { ... spinlock(&xxx_lock); xxx_count--;/* 減小使用計數*/ spin_unlock(&xxx_lock); return 0; }
當臨界區的一個文件能夠被同時讀取,可是並不能被同時讀和寫。若是一個線程在讀,另外一個線程在寫,那麼極可能會讀取到錯誤的不完整的數據。讀寫自旋鎖是能夠容許對臨界區的共享資源進行併發讀操做的。可是並不容許多個線程併發讀寫操做。若是想要併發讀寫,就要用到了順序鎖。
讀寫自旋鎖的讀操做函數以下所示:
函數 | 描述 |
---|---|
DEFINE_RWLOCK(rwlock_t lock) | 定義並初始化讀寫鎖 |
void rwlock_init(rwlock_t *lock) | 初始化讀寫鎖 |
void read_lock(rwlock_t *lock) | 獲取讀鎖 |
void read_unlock(rwlock_t *lock | 釋放讀鎖 |
void read_unlock_irq(rwlock_t *lock) | 打開本地中斷,而且釋放讀鎖 |
void read_lock_irqsave(rwlock_t *lock,unsigned long flags) | 保存中斷狀態,禁止本地中斷,並獲取讀鎖 |
void read_unlock_irqrestore(rwlock_t *lock,unsigned long flags) | 將中斷狀態恢復到之前的狀態,而且激活本地中斷,釋放讀鎖 |
void read_lock_bh(rwlock_t *lock) | 關閉下半部,並獲取讀鎖 |
void read_unlock_bh(rwlock_t *lock) | 打開下半部,並釋放讀鎖 |
讀寫自旋鎖的寫操做函數以下所示:
函數 | 描述 |
---|---|
void write_lock(rwlock_t *lock) | 獲取寫鎖 |
void write_unlock(rwlock_t *lock) | 釋放寫鎖 |
void write_lock_irq(rwlock_t *lock) | 禁止本地中斷,而且獲取寫鎖。 |
void write_unlock_irq(rwlock_t *lock) | 打開本地中斷,而且釋放寫鎖 |
void write_lock_irqsave(rwlock_t *lock,unsigned long flags) | 保存中斷狀態,禁止本地中斷,並獲取寫鎖 |
void write_unlock_irqrestore(rwlock_t *lock,unsigned long flags) | 將中斷狀態恢復到之前的狀態,而且激活本地中斷,釋放寫鎖 |
void write_lock_bh(rwlock_t *lock) | 關閉下半部,並獲取寫鎖 |
void write_unlock_bh(rwlock_t *lock) | 打開下半部,並釋放寫鎖 |
rwlock_t lock; /* 定義rwlock */ rwlock_init(&lock); /* 初始化rwlock */ /* 讀時獲取鎖*/ read_lock(&lock); ... /* 臨界資源 */ read_unlock(&lock); /* 寫時獲取鎖*/ write_lock_irqsave(&lock, flags); ... /* 臨界資源 */ write_unlock_irqrestore(&lock, flags);
順序鎖是讀寫鎖的優化版本,讀寫鎖不容許同時讀寫,而使用順序鎖能夠完成同時進行讀和寫的操做,但並不容許同時的寫。雖然順序鎖能夠同時進行讀寫操做,但並不建議這樣,讀取的過程並不能保證數據的完整性。
順序鎖的讀操做函數以下所示:
函數 | 描述 |
---|---|
DEFINE_SEQLOCK(seqlock_t sl) | 定義並初始化順序鎖 |
void seqlock_ini seqlock_t *sl) | 初始化順序鎖 |
void write_seqlock(seqlock_t *sl) | 順序鎖寫操做 |
void write_sequnlock(seqlock_t *sl) | 獲取寫順序鎖 |
void write_seqlock_irq(seqlock_t *sl) | 禁止本地中斷,而且獲取寫順序鎖 |
void write_sequnlock_irq(seqlock_t *sl) | 打開本地中斷,而且釋放寫順序鎖 |
void write_seqlock_irqsave(seqlock_t *sl,unsigned long flags) | 保存中斷狀態,禁止本地中斷,並獲取寫順序 |
void write_sequnlock_irqrestore(seqlock_t *sl,unsigned long flags) | 將中斷狀態恢復到之前的狀態,而且激活本地中斷,釋放寫順序鎖 |
void write_seqlock_bh(seqlock_t *sl) | 關閉下半部,並獲取寫讀鎖 |
void write_sequnlock_bh(seqlock_t *sl) | 打開下半部,並釋放寫讀鎖 |
順序鎖的寫操做函數以下所示:
函數 | 描述 |
---|---|
DEFINE_RWLOCK(rwlock_t lock) | 讀單元訪問共享資源的時候調用此函數,此函數會返回順序鎖的順序號 |
unsigned read_seqretry(const seqlock_t *sl,unsigned start) | 讀結束之後調用此函數檢查在讀的過程當中有沒有對資源進行寫操做,若是有的話就要重讀 |
copy_from_user的使用是結合進程上下文的,由於他們要訪問「user」的內存空間,這個「user」必須是某個特定的進程。若是在驅動中使用這兩個函數,必須是在實現系統調用的函數中使用,不可在實現中斷處理的函數中使用。若是在中斷上下文中使用了,那代碼就極可能操做了根本不相關的進程地址空間。其次因爲操做的頁面可能被換出,這兩個函數可能會休眠,因此一樣不可在中斷上下文中使用。
信號量和自旋鎖有些類似,不一樣的是信號量會發出一個信號告訴你還須要等多久。所以,不會出現傻傻等待的狀況。好比,有100個停車位的停車場,門口電子顯示屏上實時更新的停車數量就是一個信號量。這個停車的數量就是一個信號量,他告訴咱們是否能夠停車進去。當有車開進去,信號量加一,當有車開出來,信號量減一。
好比,廁所一次只能讓一我的進去,當A在裏面的時候,B想進去,若是是自旋鎖,那麼B就會一直在門口傻傻等待。若是是信號量,A就會給B一個信號,你先回去吧,我出來了叫你。這就是一個信號量的例子,B聽到A發出的信號後,能夠先回去睡覺,等待A出來。
所以,信號量顯然能夠提升系統的執行效率,避免了許多無用功。信號量具備如下特色:
函數 | 描述 |
---|---|
DEFINE_SEAMPHORE(name) | 定義一個信號量,而且設置信號量的值爲1 |
void sema_init(struct semaphore *sem, int val) | 初始化信號量sem,設置信號量值爲val |
void down(struct semaphore *sem) | 獲取信號量,由於會致使休眠,所以不能在中斷中使用 |
int down_trylock(struct semaphore *sem); | 嘗試獲取信號量,若是能獲取到信號量就獲取,而且返回0.若是不能就返回非0,而且不會進入休眠 |
int down_interruptible(struct semaphore | 獲取信號量,和down相似,只是使用dow進入休眠狀態的線程不能被信號打斷。而使用此函數進入休眠之後是能夠被信號打斷的 |
void up(struct semaphore *sem) | 釋放信號量 |
struct semaphore sem; /* 定義信號量 */ sema_init(&sem, 1); /* 初始化信號量 表示只能由一個線程同時訪問這塊資源 */ down(&sem); /* 申請信號量 */ /* 臨界區 */ up(&sem); /* 釋放信號量 */
互斥體表示一次只有一個線程訪問共享資源,不能夠遞歸申請互斥體。
信號量也能夠用於互斥體,當信號量用於互斥時(即避免多個進程同時在一個臨界區中運行),信號量的值應初始化爲1.這種信號量在任何給定時刻只能由單個進程或線程擁有。在這種使用模式下,一個信號量有時也稱爲一個「互斥體( mutex)」,它是互斥( mutual exclusion)的簡稱。Linux內核中幾平全部的信號量均用於互斥。
函數 | 描述 |
---|---|
DEFINE_MUTEX(name) | 定義並初始化一個 mutex變量 |
void mutex_init(mutex *lock) | 初始化 mutex |
void mutex_lock(struct mutex *lock) | 獲取 mutex,也就是給 mutex上鎖。若是獲取不到就進休眠 |
void mutex_unlock(struct mutex *lock) | 釋放 mutex,也就給 mutex解鎖 |
int mutex_trylock(struct mutex *lock) | 判斷 mutex是否被獲取,若是是的話就返回,不然返回0 |
int mutex_lock_interruptible(struct mutex *lock) | 使用此函數獲取信號量失敗進入休眠之後能夠被信號打斷 |
struct mutex lock; /* 定義一個互斥體 */ mutex_init(&lock); /* 初始化互斥體 */ mutex_lock(&lock); /* 上鎖 */ /* 臨界區 */ mutex_unlock(&lock); /* 解鎖*/
互斥體和自旋鎖都是解決互斥問題的一種手段。互斥體是進程級別的,互斥體在使用的時候會發生進程間的切換,所以,使用互斥體資源開銷比較大。自旋鎖能夠節省上下文切換的時間,若是持有鎖的時間不長,使用自旋鎖是比較好的選擇,若是持有鎖時間比較長,互斥體顯然是更好的選擇。
你們的鼓勵是我繼續創做的動力,若是以爲寫的不錯,歡迎關注,點贊,收藏,轉發,謝謝!
有任何問題,都可經過公告中的二維碼聯繫我