iOS概念攻堅之路(五):線程同步方案

前言

多線程編程所處的環境是一個複雜的環境,線程之間穿插執行,須要使用必定的手段來保證程序的正確運行,這個手段就是同步。這篇文章分了兩個部分,第一部分會先介紹同步的概念,第二部分是 iOS 中能使用到的同步方案的一個分析以及具體如何使用。ios

線程同步的概念

爲何要同步

線程之間的關係是合做關係,既然是合做,那就得有某種約定的規則,不然合做就會出現問題。例如,第一個線程在執行了一些操做後想檢查當前的錯誤狀態 errno,但在其作出檢查以前,線程 2 卻修改了 errno。這樣,當第一個線程再次得到控制權後,檢查結果將是線程 2 改寫過的 errno,而這是不正確的:git

線程 1 線程 2
... ...
讀 errno 變量 ...
... 寫 errno 變量
從讀操做返回 ...
檢查 errno 值 ...

之因此出現上述問題,是基於兩個緣由:github

  • errno 是線程之間共享的全局變量
  • 線程之間的相對執行順序是不肯定的

解決上述問題有兩個方法:第一個是限制全局變量,給每一個線程一個私有的 errno 變量。事實上,若是能夠將全部的資源都私有化,讓線程之間不共享,那麼這種問題就不復存在。算法

問題是,若是全部的資源都不共享,那麼就不須要發明線程了,甚至也不必發明進程。線程和進程的設計初衷:共享資源,提升資源利用率。因此這種方式是不切實際的。編程

那剩下的方法就是解決線程之間的執行順序,方法就是同步。swift

同步的目的就是無論線程之間的執行如何穿插,都保證運行結果是正確的。數組

鎖的進化:金魚生存

這個例子是《計算機的心智操做系統之哲學原理(第2版)》中舉的例子,很是生動的描述了鎖的由來。安全

養過金魚的人都知道,金魚有一個很大的特色,就是沒有飽的感受。所以,金魚吃東西不會由於吃飽就中止。它們會不停的吃,直到脹死。所以,金魚池多少得由養金魚的人來肯定,其死活也由人控制。多線程

如今假定 A 和 B 兩我的合住一套公寓,共同養了一條金魚。該金魚天天進食一次。兩我的想把金魚養活,一天只能喂一次,也只能喂一次。若是一天內兩人都餵了魚,魚就脹死。若是一天內兩人都沒有餵魚,魚就餓死。併發

他們二人爲了把魚養好,既不讓魚脹死,也不讓魚餓死,作出以下約定:

  • 天天餵魚一次,且僅一次
  • 若是今天 A 餵了魚,B 幾天就不能再喂;反之亦然
  • 若是今天 A 沒有餵魚,B 今天就必須喂;反之亦然

顯然,要想保持魚活着,A 和 B 得進行某種合做。固然,最簡單的狀況是不進行任何溝通,每一個人以爲須要餵魚時,查看一下魚的狀態:若是感受到魚像是沒進過食,則餵魚;不然不喂。下圖給出的是沒有同步狀況下 A 和 B 所執行的程序:

A:                          B:
if (noFeed) {               if (noFeed) {
    feed fish;                  feed fish;
}                           }
複製代碼

那上述程序裏是如何判斷 noFedd 的值的呢?程序裏沒有給出,所以只能依靠 A 和 B 的高超養魚技術,即經過查看魚的外形來判斷金魚當天是否進食了。固然,只有高手才能達到這個水平,通常的人是看不出來的。萬一 A 或者 B 沒有看出對方已經餵過魚了,再喂一次,魚就脹死了。或者,沒有看出對方沒有餵過魚,而沒有喂,魚就餓死了。

即便假設 A 和 B 都是養魚高手,經過查看魚的外形就能夠判斷魚是否餵過,上述程序也能正確執行嗎?答案是否認的。因爲線程的執行能夠任意穿插,A 能夠先檢查魚,發現沒有喂,就準備餵魚。但就在 A 準備喂但還沒有喂的時候,程序切換,輪到 B 執行。B 一看,魚尚未喂(確實如此),就餵魚。在喂完魚後,線程再次切換到 A。此時 A 從檢查完魚狀態後的指令開始執行,就是餵魚。這樣魚被餵了兩次,魚就脹死了。

事件時序表
時序 A B
13:00 查看魚(沒喂) ...
13:05 ... 查看魚(沒喂)
13:10 ... 餵魚
13:25 餵魚 ...
魚脹死

爲何這個程序會出現魚脹死的狀況呢?由於 A 和 B 兩我的同時執行了同一段代碼(if (noFeed) feed fish)。兩個或多個線程爭相執行同一段代碼或訪問同一資源的現象稱爲 競爭(race)。這個可能形成競爭的共享代碼段或資源稱爲 臨界區(critical section)

固然,咱們知道兩個線程不可能真的在同一時刻執行(單核狀況)。但有可能在同一時刻兩個線程都在同一段代碼上。這個例子裏競爭的是代碼,是代碼競爭。若是是兩個線程同時訪問一個數據就叫作數據競爭。這個程序形成魚脹死的就是由於兩個線程同時進入了臨界區。

以人類進化來講,此程序只至關與氨基酸階段,胡亂競爭,並不具有任何協調能力。

變形蟲階段

要防止魚脹死,就須要防止競爭,要想避免競爭,就須要防止兩個或多個線程同時進入臨界區。要達到這一點,就須要某種協調手段。

協調的目的就是在任什麼時候刻都只能有一我的在臨界區裏,這稱爲 互斥(mutual exclusion)。互斥就是說一次只有一我的使用共享資源,其餘人皆排除在外。正確互斥須要知足 4 個條件:

  • 不能有兩個進程同時在臨界區裏面
  • 進程可以在任何數量和速度的 CPU 上正確執行
  • 在互斥區域外不能阻止另外一個進程的運行
  • 進程不能無限制的等待進入臨界區

若是任何一個條件不知足,那麼設計的互斥就是不正確的。

那麼有沒有辦法確保一次只有一我的在臨界區呢?有,讓兩個線程協調。固然,最簡單的協調方法是交談。問題是 A 和 B 不必定有時間碰面,那麼剩下的辦法是留紙條。由此,得到第一種同步方式:A 和 B 商定,每一個人在餵魚以前先留下字條,告訴對方本身將檢查魚缸並在須要時餵魚:

A:                          B:
if (noNote) {               if (noNote) {
    leave note;                 leave note;
    if (noFeed) {               if (noFeed) {
        feed fish;                  feed fish;
    }                           }
    remove note;                remove note;
}                           }
複製代碼

上述機制可否避免魚脹死呢?不能,若是 A 和 B 交叉執行上述程序,仍是會形成魚脹死的結局,這是由於雖然使用的是互斥的手段,即留字條,卻沒有達到互斥的目的。由於字條並無方式 A 和 B 兩我的同時進入臨界區。固然,與第一個解決方案比起來,本方案仍是有所改善,即魚脹死的機率下降了。

只有在 A 和 B 嚴格交叉執行的狀況下,纔可能發生魚脹死的現象。所以,咱們並不是徹底在白費力氣。

事件時序表
時序 A B
3:00 檢查字條(沒有) ...
3:05 ... 檢查字條(沒有)
3:10 ... 留下字條
3:25 留下字條 ...
3:50 查看魚(沒有餵過) ...
4:05 ... 查看魚(沒有餵過)
4:10 ... 餵魚
4:25 餵魚 ...
魚脹死

此程序雖然加入了一點同步機制,但這個機制太原始,達不到真正的同步目的。以人類進化來比喻,此程序至關於變形蟲階段。

魚階段

仔細分析能夠發現,上述程序不解決問題的緣由是咱們先檢查有沒有字條,後留字條。這樣在檢查字條和留字條之間就留下了空當。那麼咱們就修改一下順序,先留字條,再檢查有沒有對方的字條。若是沒有對方的字條,那麼就餵魚,喂完把字條拿掉。不過這種方法須要區分字條是誰的,咱們獲得以下程序(第二種同步方案):

A:                          B:
leave noteA;                leave noteB;
if (no noteB) {             if (no noteA) {
    if (no feed) {              if (noFeed) {
        feed fish;                  feed fish;
    }                           }
}                           }
remove noteA;               remove noteB;
複製代碼

上述程序可以保證魚不會被脹死,由於不管按照什麼順序穿插,總有一我的的留字條指令在另外一我的的檢查字條指令前執行,從而將防止兩我的同時進入臨界區,於是魚不會由於兩我的都喂而脹死。

可是,魚卻有可能餓死:

事件時序表
時序 A B
3:00 ... 留字條 noteB
3:05 留字條 A 檢查字條(沒有)
3:10 檢查字條 noteB(有) ...
3:25 ... 檢查字條 noteA(有)
3:50 ... 移除字條 noteB
4:05 移除字條 noteA ...
沒有人餵魚,魚餓死

雖然存在餓死的狀況,可是咱們的力氣並無白費。對於一個計算機系統來講,餓死好於脹死。若是脹死,則程序的運行極可能出錯:幾個線程同時得到同一個資源,出現不一致性及結果不肯定性幾乎是難以免的。但若是是餓死,即你們都拿不到某個資源,線程處於飢餓狀態,至可能是中止推動,而這不必定產生錯誤結果,或許只是推遲結果的出現。

雖然餓死比脹死好受一點,但畢竟仍是存在死的可能,仍是在很原始的階段。以人類進化來比,至關於魚階段。所以,咱們須要繼續進化,或者說努力。

猴階段

那麼爲何魚會餓死呢?是由於沒有人進入臨界區。雖然互斥確保了沒有兩我的同時進入臨界區,但這種沒有人進入臨界區的狀況則有點互斥過了頭。要想魚不餓死,還要保證有一我的進入臨界區來餵魚。那用什麼辦法來保證呢?

辦法就是讓某我的等着,直到確認有人爲了魚才離去,不要一見到對方的字條就開溜走人。也就是說,在兩我的同時留下字條的狀況下,必須選擇某我的來餵魚,因而得出第 3 種同步方式:

A:                          B:
leave noteA;                leave noteB;
while(noteB) {              
    do nothing;
}
                            if (no noteA) {
if (noFeed) {                   if (noFeed) {
    feed fish;                      feed fish;
}                               }
                            }
remove noteA;               remove noteB;
複製代碼

魚顯然不會脹死,由於使用的辦法包括了第 2 種同步方式。那麼魚會不會餓死呢?也不會,由於前面說過,魚餓死的惟一狀況是兩我的同時留字條,而且又都走人。而上述程序在兩我的都留字條的狀況下,A 不會走人,而是一直循環等待直到對方刪除字條後,再檢查魚有沒有喂,並在沒有喂的狀況下餵魚。所以,該同步方式既防止了脹死,又防止了餓死。

這一階段算是猴階段,魚既不會脹死,也不會餓死,但這還不夠。

猴階段的同步機制雖然正確,但存在不少問題。

首先是程序不對稱。A 執行的程序和 B 執行的程序並不同。那不對稱有什麼問題嗎?固然有,不對稱形成程序編寫困難,爲了追求程序的正確性,即便是作一樣操做的線程也得編寫得不一樣,這天然就增長了編程的難度。不對稱還形成程序證實的困難,要想從理論上證實第 3 種同步方式程序的正確性是一件十分複雜的事情,這一點研究程序證實的人是很清楚的。

上述程序的另外一個大問題是浪費,A 執行的循環等待是一種很大的浪費,但浪費還不是循環等待的惟一問題,它還可能形成 CPU 調度的 優先級反轉(倒掛)。優先級反轉就是高優先級的線程等待低優先級的線程。例如,假如 B 先於 A 啓動,留下字條後正準備檢查是否有 A 的字條時,A 啓動。因爲 A 的優先級高於 B,所以 A 得到 CPU,留下字條,進入循環等待。因爲 A 的優先級高,所以 B 沒法得到 CPU 而完成剩下的工做,進而形成 A 始終處於循環等待階段沒法推動。這樣高優先級的 A 就被低優先級的 B 所阻塞。因爲優先級反轉徹底違反了設立優先級的初衷,因此使人沒法容忍。

那咱們只能對同步方案進行改進,那麼在哪個方案的基礎上改進呢?咱們天然會想到最後一個方案,由於它已經知足了魚既不餓死也不脹死的條件,無非就是很差看和循環等待。關鍵是這兩點能夠改進嗎?答案是否認的,循環等待不能去掉,一去掉就變成第 2 個方案;若想使其對稱、美觀,就須要將 B 改成和 A 一樣,而這樣一樣會形成魚餓死的可能。所以對最後一個方案進行修改彷佛不是明智之舉。

新的思路就是直接最開始的兩個方案進行修改。因爲最開始的兩個方案均達不到既不餓死又不脹死的條件,所以咱們天然選擇一個較爲美觀、簡單的方案來修改。在兩個方案之間,第 1 個方案徹底對稱,而第 2 個方案不徹底對稱,由於每一個人的字條不一樣。所以,咱們選擇第 1 個方案做爲修改的基礎。但如何修改呢?

要想知道如何修改,就得知道第 1 個方案爲何不知足條件。

那麼第 1 個方案爲何不知足條件呢?咱們說過,是由於檢查字條和留字條是兩個步驟,中間留有被別的線程穿插的空當,從而形成字條做用的喪失。咱們就想,可否將這兩個步驟併爲一個步驟,或者變成一個原子操做,使其中間不留空當,不就解決問題了嗎?

換句話說,咱們之因此到如今還沒把金魚問題處理掉,是由於咱們一直在很是低的層次上打轉。由於咱們試圖工做的層面是魚和魚缸這個層面,即留字條是爲了防止兩我的同時查看魚缸。咱們僅僅在指令層上進行努力。因爲控制的單元是一條條的指令,所以對指令之間的空當無能爲力。而解決這種問題的辦法就是提升抽象的層次,將控制的層面上升到對一組指令的控制。

例如,在金魚問題裏,若是咱們將抽象層次從保護魚和魚缸的層次提升到保護放置魚缸的房間的層次,這個問題就能夠解決。這樣,檢查字條和留字條的兩步操做就變成將房間鎖上的一步操做。

那麼如何保證這個房間一次只進入一我的呢?咱們先看看生活當中咱們是如何確保一個房間只能進入一我的的。例如,兩個教師都想使用同一個教室來爲學生補課,怎麼協調呢?進到教室後將門鎖上,另一個教師就沒法進來使用教室了。即教室是用鎖來保證互斥的。那麼在操做系統裏,這種能夠保證互斥的同步機制稱爲

有了鎖,金魚問題就能夠解決了。當一我的進來想餵魚時,就把放有魚缸的房間鎖住,這樣另一我的進不來,天然沒法餵魚,以下所示:

A:                          B:
lock();                     lock()
if (noFeed) {               if (noFeed) {
    feed fish;                  feed fish;
}                           }
unlock();                   unlock();
複製代碼

從上面程序咱們能夠看到,基於鎖的互斥性,A 和 B 只能有一我的進入房間來餵魚,所以魚不會脹死。而且,若是兩人都同時執行上述程序時,因爲先拿到鎖的人會進入房間餵魚,所以魚也不會餓死。更爲重要的是,兩我的執行徹底一樣的代碼。既對稱,也容易寫,證實起來也不困難。這樣,金魚問題從而獲得解決。

一個正常鎖應該具有的特性:

  • 鎖的初始狀態是打開狀態
  • 進臨界區時必須得到鎖
  • 出臨界區時必須打開鎖
  • 若是別人持有鎖則必須等待

第一種方案之所謂沒有將資源鎖住,是由於違反了第 4 個條件,即在別人持有鎖(留下字條)的狀況下,也照樣進入了臨界區(由於檢查是否別人持有鎖在別人留鎖以前進行)。所以,這個字條沒法起到鎖的做用。

以人類進化來比喻,上述程序至關於人階段了。

那麼這個程序還有什麼問題沒有?若是 A 正在餵魚的話,B 能幹什麼事情嗎?只能等待(等待鎖變爲打開狀態)。若是 A 餵魚的動做很慢,B 等待的事件就會很長。而這種繁忙等待不只將形成浪費,並且將下降系統效率。那有沒有辦法消除鎖的繁忙等待呢?答案是否認的,由於鎖的特性就是在別人持有鎖的狀況下須要等待。不過仍是能夠減小繁忙等待的時間長度。怎麼縮短等待的時間呢?

仔細分析發現,A 餵魚並不須要在持有鎖的狀態下進行。咱們就但願餵魚的這段時間不要放在鎖裏面,而是得到鎖後留下字條說它餵魚去了,而後釋放鎖,再餵魚。而 B 在拿到鎖後先檢查有沒有字條,有字條就釋放鎖,幹別的去。沒有就留字條,而後釋放鎖,再餵魚。這樣,因爲持鎖的時間只限於設置字條的事件,所以,對方循環等待的時間會很短,而真正的操做(在這裏是餵魚)則隨便多慢也沒有問題了。

A:                              B:
lock();                         lock();
if (no NoteB) {                 if (no NoteA) {
    leave noteA;                    leave noteB;
}                               }
unlock();                       unlock();
if (no NoteB) {                 if (no NoteA) {
    if (noFeed) {                   if (noFeed) {
        feed fish;                      feed fish;
    }                               }
    remove note;                    remove note;
}                               }
複製代碼

這個方法使得鎖上的繁忙等待時間變得不多。但無論怎樣,終究仍是須要等待的。那有沒有辦法不用進行任何繁忙等待呢?有,答案就是睡覺與叫醒,即 sleepwakeup

睡覺與叫醒:生產者與消費者問題

什麼是睡覺與喚醒呢?就是若是對方持有鎖,你就不須要等待鎖變爲打開狀態,而是去睡覺,鎖打開後對方再來把你叫醒。咱們下面用生產者與消費者的問題來演示這個機制。

生產者生產的產品由消費者來消費,但消費者通常不直接從生產者手裏獲取產品,而是經過一箇中介機構,好比商店。生產者把東西放在這裏,消費者到這裏來拿。爲何須要這個中介機構呢?這是由於商店的存在使得生產者和消費者可以相對獨立的運行,而沒必要亦步亦趨的跟在另外一方後面。

用計算機來模擬生產者和消費者是件很簡單的事:一個進程表明生產者,一個進程表明消費者,一片內存緩衝區表明商店。生產者生產的物品從一端放入緩衝區,消費者從另一段獲取物品,以下圖:

一個很是好的例子是校園中的售貨機。售貨機是緩衝區,負責裝載售貨機的送貨員是生產者,而購買可樂、糖果的學生是消費者。只要售貨機不滿也不空,送貨員和學生就能夠繼續他們的送貨和消費。問題是,若是學生來買可樂,卻發現售貨機空了,怎麼辦?學生固然有兩個選擇:一是坐在售貨機前面等待,直到送貨員來裝貨爲止;二是回宿舍睡覺,等售貨員裝貨後再來買。第 1 種方式顯然效率很低,估計沒有什麼人願意這麼作。比較起來,第 2 種方式要好些。只不過睡覺中的學生不可能知道售貨員來了,所以咱們須要送貨員來了後將學生叫醒。

一樣,若是送貨員來送貨發現售貨機滿時也有兩種應對辦法:一是等有人來買走一些東西,而後將售貨機填滿;二是回家睡覺,等有人買了後再來補貨。固然,這個時候購買者須要將送貨員叫醒。以程序來表示生產者和消費者問題的解決方案以下:

#define N 100 // 售貨機最大商品數
Int count = 0;  // 售貨機當前商品數

void producer(void) {
    int item;
    while(TRUE) {
        item = produce_item();
        if (count == N) sleep();
        insert_item(item);
        count = count+1;
        if (count == 1) wakeup(consumer);
    }
}

void consumer(void) {
    int item;
    while(TRUE) {
        if (count == 0) sleep();
        item = remove_item();
        count = count-1;
        if (count == N-1) wakeup(producer);
        consume_item(item);
    }
}
複製代碼

sleepwakeup 就是操做系統裏的睡覺和叫醒操做原語。一個程序調用 sleep 後將進入休眠狀態,將釋放其所佔用的 CPU。一個執行 wakeup 的程序將發送一個信號給指定的接收進程,如 wakeup(producer) 就發送一個信號給生產者。

咱們仔細來看上面的程序。最上面兩行定義了緩衝區的大小(可容納 100 件商品)和當前緩衝區裏面的商品個數,初始化爲 0。生產者程序的運行以下:每生產一件商品,檢查當前緩衝區的商品數,若是緩衝區已滿,則該程序進入睡眠狀態;不然將商品放入緩衝區,將計數加 1。而後判斷計數是否等於 1 ,若是是,說明在放這件商品前緩衝區中的商品個數爲 0,有可能存在消費者見到空緩衝區而去睡覺,所以須要發送叫醒信號給消費者。

消費者程序運行以下:先檢查當前商品計數,若是是 0,沒有商品,固然去睡覺。不然,從緩衝區拿走一件商品,將計數減 1 。而後判斷計數是否等於 N-1。若是是,則說明在拿這件商品前緩衝區的商品計數爲 N,有可能存在生產者見到滿緩衝區而去睡覺,所以須要發送叫醒信號給生產者。而後盡情地享用商品。

這個程序看上去彷佛正確無誤,但實際上仍是存在問題。

第一個問題:變量 count 沒有被保護,可能發生數據競爭。即生產者和消費者可能同時對該數據進行修改。例如,假定 count 如今等於 1。那麼生產者先運行,對 count 加 1 操做後 count 變爲 2,但在判斷 count 是否等於 1 以前,CPU 被消費者得到,隨後對 count 進行了減 1 的操做後切換回生產者,這個時候 count 等於 1,所以生產者將發出叫醒消費者的信號。顯然,這個信號是不該該發出的。

第二個問題:上述程序可能形成生產者和消費者均沒法往前推動的狀況,即死鎖。例如,假定消費者先來,這個時候 count = 0,因而去睡覺,可是在判斷 count == 0 後而且在執行 sleep 語句前 CPU 發生切換,生產者開始運行,它生產一件商品後,給 count 加 1,發現 count 結果爲 1,所以發出叫醒消費者信號。但這個時候消費者尚未睡覺(正準備要睡),因此該信號沒有任何效果,浪費了。而生產者一直運行直到緩衝區滿了後也去睡覺。這個時候 CPU 切換到消費者,而消費者執行的第 1 個操做就是 sleep,即睡覺。至此,生產者和消費者都進入睡覺狀態,從而沒法相互叫醒而繼續往前推動。系統死鎖發生。

那咱們如何解決上述兩個問題呢?對第 1 個問題,解決方案很簡單:用鎖!在進行對 count 的操做先後分別加上開鎖和閉鎖便可防止生產者和消費者同時訪問 count 狀況的出現。不過,咱們不就是由於鎖存在繁忙等待才發明 sleepwakeup 的嗎?怎麼又把鎖請回來了呢?

確實,咱們不喜歡鎖所採用的繁忙等待,於是發明了 sleepwakeup,可是,咱們不喜歡等待,並非一刻都不能等,只要等待的事件夠短,就是能夠接受的。而在 count 的訪問先後加上鎖所形成的繁忙等待是很短的。(iOS SideTable 中的自旋鎖也是這樣的鎖,由於引用計數的增減是很迅速的操做)

勉強解決了第 1 個問題,第 2 個問題怎麼解決呢?

顯然,生產者和消費者都不會本身從睡覺中醒過來。因此若是兩者同時去睡覺了,天然也沒法叫醒對方。那解決的方案就是不讓兩者同時睡覺。而形成兩者同時睡覺的緣由是生產者發出的叫醒信號丟失(由於消費者此時還沒睡覺)。那咱們就想,若是用某種方法將發出的信號累積起來,而不是丟掉,問題不就解決了嗎?在消費者得到 CPU 並執行 sleep 語句後,生產者在這以前發送的叫醒信號還保留,所以消費者將立刻得到這個信號而醒過來。而可以將信號量累積起來的操做系統原語就是信號量。

信號量

信號量(semphore) 能夠說是全部原語裏面功能最強大的。它不只是一個同步原語,仍是一個通訊原語。並且,它還能做爲鎖來使用!前面已經討論過做爲通訊原語的信號量,如今咱們來看其做爲同步原語和鎖的能力。

簡單來講,信號量就是一個計數器,其取值爲當前累積的信號數量。它支持兩個操做:加法操做 up 和減法操做 down,分別描述以下:

down 減法操做:

  1. 判斷信號量的取值是否大於等於 1
  2. 若是是,將信號量的值減去 1,繼續往下執行
  3. 不然在該信號量上等待(線程被掛起)

up 加法操做:

  1. 將信號量的值加 1(此操做將叫醒一個在該信號量上面等待的線程)
  2. 線程繼續往下執行

這裏須要注意的是,down 和 up 兩個操做雖然包含多個步驟,但這些操做是一組原子操做,它們之間是不能分開的。

若是將信號量的取值限制爲 0 和 1 兩種狀況,則得到的就是一把鎖,也被稱爲 二元信號量(binary semaphore),其操做以下:

down 減法操做:

  1. 等待信號量的值變爲 1
  2. 將信號量的值設置爲 0
  3. 繼續往下執行

up 加法操做:

  1. 將信號量的值設置爲 1
  2. 叫醒在該信號量上面等待的第 1 個線程
  3. 線程繼續往下執行

因爲二元信號量的取值只有 0 和 1,所以上述程序防止任何兩個程序同時進入臨界區。

二元信號量具有鎖的功能,實際上它與鎖很類似:down 就是得到鎖,up 就是釋放鎖。但它又比鎖更爲靈活,由於在信號量上等待的線程不是繁忙等待,而是去睡覺,等待另一個線程執行 up 操做來叫醒。所以,二元信號量從某種意義上說就是鎖和睡覺與叫醒兩種原語操做的合成。

有了信號量,咱們就能夠垂手可得地解決生產者和消費者的同步問題。具體說來就是,咱們先設置 3 個信號量,分別以下:

mutex:
一個二元信號量,用來防止兩個線程同時對緩衝區進行操做。
初始值爲 1。

full:
記錄緩衝區裏商品的件數。
初始值爲 0。

empty:
記錄緩衝區裏空置空間的數量。
初始值爲 N(緩衝區大小)
複製代碼

咱們的生產者和消費者程序以下:

const int N = 100;          // 定義緩衝區大小
typedef int semaphore;      // 定義信號量類型
semaphore mutex = 1;        // 互斥信號量
semaphore empty = N;        // 緩衝區計數信號量,用來計數緩衝區裏的空位數量
semaphore full = 0;         // 緩衝區計數信號量,用來計數緩衝區裏的商品數量

void producer(void) {
    int item;
    while(TRUE) {
        item = produce_item();
        down(empty);
        down(mutex);
        insert_item(item);
        up(mutex);
        up(full);
    }
}

void consumer(void) {
    int item;
    while(TRUE) {
        down(full);
        down(mutex);
        item = remove_item();
        up(mutex);
        up(empty);
        consume_item(item);
    }
}
複製代碼

該程序解決了前一個版本的問題嗎?很顯然,上述程序中生產者和消費者不可能同時睡覺而形成死鎖。由於兩我的同時睡覺就意味着:full=0(生產者才睡覺),而且 empty=0(消費者睡覺的條件)。那麼 emptyfull 可以同時爲 0 嗎?固然不會,由於初始值是 empty = Nfull = 0。要使 empty=0,生產者就必須生產,而一旦生產者開始生產,full 就不能爲 0 了。因此兩個不會同時睡覺。

這樣上述程序既保護了緩衝區不會被生產者和消費者同時訪問,又防止了生產者或消費者發送的信號丟失。生產者生產了多少商品,信號量 full 就取多大的值,這就至關於前一個版本里面發送的信號的個數。由於消費者等待的地方就是 full 這個信號量,所以,生產者生產了多少商品,就能夠最多這麼屢次叫醒消費者。反之亦然,消費者消費了多少商品,信號量 empty 就記錄了多少數量,也就是能夠多少次叫醒生產者。這樣就解決了信號丟失的問題。

那爲何須要 3 個信號量呢?一個二元信號量用來互斥,一個信號量用來記錄緩衝區裏商品的數量不就能夠了嗎?緩衝區裏空格的數量不是能夠由緩衝區大小和緩衝區裏商品的數量計算得出嗎?爲何須要一個 full 和一個 empty 來記錄滿的和空的呢?這是由於生產者和消費者等待的信號不一樣,它們須要在不一樣的信號上睡覺。

iOS 線程同步方案

先列舉一下上面提到的概念:

  • 同步:解決線程之間的執行順序,無論線程之間的執行如何穿插,都保證運行結果是正確的
  • 競爭:兩個或多個線程爭相執行同一段代碼或訪問同一資源的現象稱爲競爭
  • 臨界區:形成競爭的共享代碼或資源
  • 互斥:一次只有一我的使用共享資源,其餘人皆排除在外
  • 鎖:一種操做系統中保持互斥的同步機制
  • 優先級反轉(倒掛):高優先級的線程被低優先級線程阻塞
  • 睡覺與叫醒(sleep 和 wakeup):睡覺指讓線程休眠,不佔用 CPU 資源,叫醒指喚醒線程;爲解決繁忙等待問題而被提出的方案
  • 信號量:一個計數器,其取值爲當前累積的信號數量;爲解決信號丟失提出的方案
    • down 操做
      • 判斷信號量的值是否大於等於1
      • 若是是,將信號量的值減 1,繼續往下執行
      • 不然在該信號量上等待(線程被掛起)
    • up 操做
      • 將信號量的值加 1(此操做將叫醒一個在該信號量上面等待的線程)
      • 線程繼續往下執行
  • 二元信號量(互斥量):信號量取值限制爲 0 和 1,至關於鎖和睡覺與叫醒操做的一個合成

iOS 中的線程同步方案,實際上是 iOS 對鎖、信號量、互斥量、條件變量的實現。由於 iOS 中使用了 GCD,因此還可使用 GCD 的串行隊列來實現同步。iOS 中有原子操做,就是在屬性中使用 atomic,可是它有侷限性,只能保證在設置和獲取時是原子操做,但不能保證設置和獲取操做是在哪個線程中進行,下面先列舉一下 iOS 中的那些線程同步方案:

  • OSSpinLock
  • os_unfair_lock
  • pthread_mutex
    • PTHREAD_MUTEX_NORMAL
    • PTHREAD_MUTEX_ERRORCHECK
    • PTHREAD_MUTEX_RECURSIVE
    • PTHREAD_MUTEX_DEFAULT
  • dispatch_semaphore
  • dispatch_queue(DISPATCH_QUEUE_SERIAL)
  • NSLock
  • NSRecursiveLock
  • NSCondition
  • NSConditionLock
  • @synchronized

來分別看一下它們的具體含義與如何使用,在下面的例子中咱們統一使用下面這個異步方法:

- (void)doSomeThingForFlag:(NSInteger)flag finish:(void(^)(void))finish {
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        NSLog(@"do:%ld",(long)flag);
        sleep(2+arc4random_uniform(4));
        NSLog(@"finish:%ld",flag);
        if (finish) finish();
    });
}
複製代碼

OSSpinLock

OSSPinlock 就是自旋鎖,速度應該是最快的鎖,等待鎖的線程會處於 忙等(busy-wait) 狀態,一直佔用着 CPU 資源,由於它須要不斷的去嘗試獲取鎖。這種忙等狀態的鎖會形成一個很嚴重的問題,那就是優先級反轉,也稱爲優先級倒掛,前面的猴階段中也提到這個問題,另外 YYKit 的做者專門寫了一篇 再也不安全的 OSSpinLock 來解釋這個問題:

在 iOS 中,系統維護了 5 個不一樣的線程優先級/Qos:backgroundutilitydefaultuser-initiateduser-interactive。高優先級線程始終會在低優先級線程前執行,一個線程不會受到比它更低優先級線程的干擾。這種線程調度算法存在潛在的優先級反轉的問題。

具體來講,在使用自旋鎖的狀況,若是一個低優先級的線程得到鎖並訪問共享資源,這時一個高優先級的線程也嘗試得到這個鎖,它會處於忙等狀態狀態從而佔用大量 CPU。此時低優先級線程沒法與高優先級線程爭奪 CPU 時間(搶不過),從而致使任務遲遲完不成,沒法釋放 lock。這並不僅是理論上的問題,lobobjc 已經遇到不少次這個問題,因而蘋果的工程師停用了 OSSpinLock

不過仍是有解決方案,也是 libobjc 目前正在使用的:鎖的持有者把線程 ID 保存到鎖內部,鎖的等待着會臨時貢獻出它的優先級來避免優先級反轉的問題。理論上這種模式會在比較複雜的多鎖條件下產生問題,但實踐上目前還一切都好。

libobjc 裏用的是 Mach 內核的 thread_switch() 而後傳遞了一個 mach thread port 來避免優先級反轉,另外它還用了一個私有的參數選項,因此開發者沒法本身實現這個鎖。另外一方面,因爲二進制兼容問題,OSSpinLock 也不能有改動。

因此,除非開發者能保證訪問鎖的線程所有都處於同一優先級,不然 iOS 系統中全部類型的自旋鎖都不能再使用了。固然蘋果還在用,SideTable 中就包含了一個自旋鎖,用於對引用計數的增減操做,這種輕量操做也是自旋鎖的使用場景。

來看一下 OSSpinLock 的使用,要導入 <libkern/OSAtomic.h>

#import <libkern/OSAtomic.h>

/** OSSpinLock */
- (void)useOSSpinlock {
    
    __block OSSpinLock oslock = OS_SPINLOCK_INIT;
    
    OSSpinLockLock(&oslock);
    [self doSomeThingForFlag:1 finish:^{
        OSSpinLockUnlock(&oslock);
    }];
    
    OSSpinLockLock(&oslock);
    [self doSomeThingForFlag:2 finish:^{
        OSSpinLockUnlock(&oslock);
    }];
    
    OSSpinLockLock(&oslock);
    [self doSomeThingForFlag:3 finish:^{
        OSSpinLockUnlock(&oslock);
    }];
    
    OSSpinLockLock(&oslock);
    [self doSomeThingForFlag:4 finish:^{
        OSSpinLockUnlock(&oslock);
    }];
    
}
複製代碼

os_unfair_lock

os_unfair_lock 是做爲 OSSpinLock 的替代方案被提出來的,iOS 10.0 以後開始支持。不過從底層調用來看,等待 os_unfair_lock 的線程會處於休眠狀態,而並不是 OSSpinLock 的忙等狀態,線程的切換是須要資源的,因此它的效率不如 OSSpinLock

它的使用與 OSSpinLock 很相似:

#import <os/lock.h>

- (void)viewDidLoad {
    [super viewDidLoad];
    [self useOS_Unfair_Lock];
}

// 定義鎖變量
os_unfair_lock unfairLock;
- (void)useOS_Unfair_Lock {
    // 初始化鎖
    unfairLock = OS_UNFAIR_LOCK_INIT;

    NSThread *thread1 = [[NSThread alloc] initWithTarget:self selector:@selector(request1) object:nil];
    [thread1 start];

    NSThread *thread2 = [[NSThread alloc] initWithTarget:self selector:@selector(request2) object:nil];
    [thread2 start];
}

- (void)request1 {
    // 加鎖
    os_unfair_lock_lock(&unfairLock);
    NSLog(@"do:1");
    sleep(2+arc4random_uniform(4));
    NSLog(@"finish:1");
    // 解鎖
    os_unfair_lock_unlock(&unfairLock);
}

- (void)request2 {
    // 加鎖
    os_unfair_lock_lock(&unfairLock);
    NSLog(@"do:2");
    sleep(2+arc4random_uniform(4));
    NSLog(@"finish:2");
    // 解鎖
    os_unfair_lock_unlock(&unfairLock);
}
複製代碼

pthread_mutex

pthread_mutex 有幾種類型:

/* * Mutex type attributes */
#define PTHREAD_MUTEX_NORMAL 0
#define PTHREAD_MUTEX_ERRORCHECK 1
#define PTHREAD_MUTEX_RECURSIVE 2
#define PTHREAD_MUTEX_DEFAULT PTHREAD_MUTEX_NORMAL
複製代碼

PTHREAD_MUTEX_NORMAL 是缺省類型,因此只有三種類型。

  • PTHREAD_MUTEX_NORMAL:默認類型,普通鎖,當一個線程加鎖後,其他請求鎖的線程將造成一個等待隊列,並在解鎖後按優先級得到鎖。這種鎖策略保證了資源分配的公平性。
  • PTHREAD_MUTEX_ERRORCHECK:檢錯鎖,若是同一個線程請求同一個鎖,則拋出一個錯誤,不然與 PTHREAD_MUTEX_NORMAL 類型動做一致。這樣就保證當不容許屢次加鎖時不會出現最簡單狀況下的死鎖。
  • PTHREAD_MUTEX_RECURSIVE:遞歸鎖,容許同一個線程對同一個鎖成功得到屢次,並經過屢次 unlock 解鎖。若是是不一樣線程請求,則在加鎖線程解鎖時從新競爭。

PTHREAD_MUTEX_NORMAL 的使用:

#import <pthread.h>

- (void)viewDidLoad {
    [super viewDidLoad];
    [self usePthread_mutex_normal];
}

pthread_mutex_t pNormalLock;
- (void)usePthread_mutex_normal {
    
    // 初始化鎖的屬性
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
    
    // 初始化鎖
    pthread_mutex_init(&pNormalLock, &attr);
    
    // 銷燬 attr
    pthread_mutexattr_destroy(&attr);
    
    pthread_mutex_lock(&pNormalLock);
    [self doSomeThingForFlag:1 finish:^{
        pthread_mutex_unlock(&pNormalLock);
    }];
    
    pthread_mutex_lock(&pNormalLock);
    [self doSomeThingForFlag:2 finish:^{
        pthread_mutex_unlock(&pNormalLock);
    }];
    
    pthread_mutex_lock(&pNormalLock);
    [self doSomeThingForFlag:3 finish:^{
        pthread_mutex_unlock(&pNormalLock);
    }];
    
    pthread_mutex_lock(&pNormalLock);
    [self doSomeThingForFlag:4 finish:^{
        pthread_mutex_unlock(&pNormalLock);
    }];
    
}
複製代碼

PTHREAD_MUTEX_ERRORCHECKPTHREAD_MUTEX_NORMAL 只是多了個同線程對同一把鎖加鎖的話會拋出一個錯誤,咱們來看一下遞歸鎖。遞歸鎖意思是同一個線程能夠屢次得到同一個鎖,其餘線程若是想要獲取這把鎖,必需要等待,這種鎖通常都是用於遞歸函數的狀況。

遞歸鎖的使用:

#import <pthread.h>

- (void)viewDidLoad {
    [super viewDidLoad];
    [self usePthread_mutex_recursive];
}

pthread_mutex_t pRecursiveLock;
- (void)usePthread_mutex_recursive {
    // 初始化鎖屬性
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    
    // 初始化鎖
    pthread_mutex_init(&pRecursiveLock, &attr);
    
    // 銷燬attr
    pthread_mutexattr_destroy(&attr);
    
    [self thread1];
    
}

- (void)thread1 {
    pthread_mutex_lock(&pRecursiveLock);
    static int count = 0;
    count ++;
    if (count < 10) {
        NSLog(@"do:%d",count);
        [self thread1];
    }
    pthread_mutex_unlock(&pRecursiveLock);
    NSLog(@"finish:%d",count);
}

- (void)dealloc {
    // 銷燬鎖
    pthread_mutex_destroy(&pRecursiveLock);
}

@end
複製代碼

dispatch_semaphore

dispatch_semaphore 是 GCD 實現的信號量,信號量是基於計數器的一種多線程同步機制,內部有一個能夠原子遞增或遞減的值,關於信號量的 API 主要是三個,createwaitsignal

信號量在初始化時要指定 value,隨後內部將這個 value 存儲起來。實際操做會存在兩個 value,一個是當前的 value,一個是記錄初始 value

信號的 waitsignal 是互逆的兩個操做。若是 value 大於 0,前者將 value 減一,此時若是 value 小於 0 就一直等待。後者將 value 加一。

初始 value 必須大於等於 0,若是爲 0 並隨後調用 wait 方法,線程將被阻塞直到別的線程調用了 signal 方法。

簡單來說,信號量爲 0 則阻塞線程,大於 0 則不會阻塞,能夠經過改變信號量的值,來控制是否阻塞線程,從而達到線程同步。

// 建立信號量,參數:信號量的初始值,若是小於 0 會返回 NULL
dispatch_semaphore_t dispatch_semaphore_create(long value);

// 等待下降信號量,接收一個信號和時間值(多爲 DISPATCH_TIME_FOREVER)
// 若信號的信號量爲 0,則會阻塞當前線程,直到信號量大於 0 或者通過輸入的時間值
// 若信號量大於 0,則會使信號量減 1 並返回,程序繼續往下執行
long dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout);

// 增長信號量,使信號量加 1 並返回
long dispatch_semaphore_signal(dispatch_semaphore_t dsema);
複製代碼

dispatch_semaphore_waitdispatch_semaphore_signal 這兩個函數中間的執行代碼,每次只會容許限定數量的線程進入。咱們通常須要控制線程數量的時候使用信號量,下面介紹信號量的幾個使用場景。

保持線程同步,將異步操做轉換爲同步操做

/** 保持線程同步,將異步操做轉換爲同步操做 */
- (void)semaphoreTest1 {
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
    
    __block int i = 0;
    dispatch_async(queue, ^{
        i = 100;
        dispatch_semaphore_signal(semaphore);
    });
    
    dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
    NSLog(@"i = %d",i);
}
複製代碼

結果輸出 i = 100block 異步執行添加到了全局併發隊列裏,因此程序在主線程會跳過 block 塊(同時開闢子線程異步執行 block),執行 block 外的代碼 dispatch_semaphore_wait,由於 semaphore 信號量爲 0,且時間爲 DISPATCH_TIME_FOREVER,因此會阻塞當前線程(主線程),進而只執行子線程的 block,直到 block 內部的 dispatch_semaphore_signal 使得信號量 +1。正在被阻塞的線程(主線程)會恢復繼續執行,這樣就保證了線程之間的同步。

爲線程加鎖

/** 爲線程加鎖 */
- (void)semaphoreTest2 {
    
    dispatch_semaphore_t semaphore = dispatch_semaphore_create(1);
    
    dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
    [self doSomeThingForFlag:1 finish:^{
        dispatch_semaphore_signal(semaphore);
    }];
    
    dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
    [self doSomeThingForFlag:2 finish:^{
        dispatch_semaphore_signal(semaphore);
    }];
    
    dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
    [self doSomeThingForFlag:3 finish:^{
        dispatch_semaphore_signal(semaphore);
    }];
    
    dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
    [self doSomeThingForFlag:4 finish:^{
        dispatch_semaphore_signal(semaphore);
    }];
    
}
複製代碼

限制線程最大併發數

/** 限制線程最大併發數 */
- (void)semaphoreTest3 {
    dispatch_semaphore_t semaphore = dispatch_semaphore_create(3);
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    
    for (int i = 0; i < 100; i++) {
        dispatch_async(queue, ^{
            dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
            NSLog(@"running");
            sleep(1);
            NSLog(@"completed...................");
            dispatch_semaphore_signal(semaphore);
        });
    }
}
複製代碼

看控制檯打印能夠看到線程的最大併發數被限制在了 3。不過更好的作法是使用 NSOperationQueueNSOperation 來實現,而不是經過 GCD 和信號量來構建本身的解決方案。

dispatch_queue(DISPATCH_QUEUE_SERIAL)

- (void)dispatch_queue_serial {
    dispatch_queue_t queue = dispatch_queue_create("myQueue", DISPATCH_QUEUE_SERIAL);
    
    for (NSInteger i = 0; i < 100; i++) {
        dispatch_async(queue, ^{
            dispatch_suspend(queue);
            [self doSomeThingForFlag:i finish:^{
                dispatch_resume(queue);
            }];
        });
    }
}
複製代碼

關鍵是使用 dispatch_suspend()dispatch_resume()

既然 GCD 能夠實現,那麼封裝了 GCD 的 NSOperationQueue 天然也可以實現。

- (void)userOperationQueue {
    
    NSOperationQueue *queue = [[NSOperationQueue alloc] init];
    [queue setMaxConcurrentOperationCount:1];
    
    __weak typeof(self) weakSekf = self;
    
    NSBlockOperation *operation1 = [NSBlockOperation blockOperationWithBlock:^{
        [queue setSuspended:YES];
        [weakSekf doSomeThingForFlag:1 finish:^{
            [queue setSuspended:NO];
        }];
    }];
    
    NSBlockOperation *operation2 = [NSBlockOperation blockOperationWithBlock:^{
        [queue setSuspended:YES];
        [weakSekf doSomeThingForFlag:2 finish:^{
            [queue setSuspended:NO];
        }];
    }];
    
    NSBlockOperation *operation3 = [NSBlockOperation blockOperationWithBlock:^{
        [queue setSuspended:YES];
        [weakSekf doSomeThingForFlag:3 finish:^{
            [queue setSuspended:NO];
        }];
    }];
    
    NSBlockOperation *operation4 = [NSBlockOperation blockOperationWithBlock:^{
        [queue setSuspended:YES];
        [weakSekf doSomeThingForFlag:4 finish:^{
            [queue setSuspended:NO];
        }];
    }];
    
    [operation4 addDependency:operation3];
    [operation3 addDependency:operation2];
    [operation2 addDependency:operation1];
    
    [queue addOperation:operation1];
    [queue addOperation:operation2];
    [queue addOperation:operation3];
    [queue addOperation:operation4];
    
}
複製代碼

NSLock

NSLock 是對 PTHREAD_MUTEX_ERRORCHECK 類型的 pthread_mutex_t 的封裝。

- (void)useNSLock {
    NSLock *nsLock = [[NSLock alloc] init];
    
    [nsLock lock];
    [self doSomeThingForFlag:1 finish:^{
        [nsLock unlock];
    }];
    
    [nsLock lock];
    [self doSomeThingForFlag:2 finish:^{
        [nsLock unlock];
    }];
    
    [nsLock lock];
    [self doSomeThingForFlag:3 finish:^{
        [nsLock unlock];
    }];
    
    [nsLock lock];
    [self doSomeThingForFlag:4 finish:^{
        [nsLock unlock];
    }];
}
複製代碼

NSRecursiveLock

NSRecursiveLock *recursiveLock;
- (void)useNSRecursiveLock {
    recursiveLock = [[NSRecursiveLock alloc] init];
    [self thread2];
}

- (void)thread2 {
    [recursiveLock lock];
    static int count = 0;
    count ++;
    if (count < 10) {
        NSLog(@"do:%d",count);
        [self thread2];
    }
    [recursiveLock unlock];
    NSLog(@"finish:%d",count);
}
複製代碼

pthread_cond_t & NSCondition

條件變量,能夠看看上文中關於睡覺與叫醒那部分,意爲當知足條件時,喚醒線程,不知足時線程會進行休眠。

以一個 生產-消費者 模式來看看 pthread_cond_t 如何使用:

pthread_mutex_t pMutex;
pthread_cond_t pCond;
NSData *data;
int count = 1;
- (void)usePthreadCond {
// // 初始化鎖屬性
    pthread_mutexattr_t mutexAttr;
    pthread_mutexattr_init(&mutexAttr);
    pthread_mutexattr_settype(&mutexAttr, PTHREAD_MUTEX_NORMAL);

    // 初始化條件變量屬性
    pthread_condattr_t condAttr;
    pthread_condattr_init(&condAttr);

    // 初始化條件變量
    pthread_cond_init(&pCond, &condAttr);
    // 初始化鎖
    pthread_mutex_init(&pMutex, &mutexAttr);

    // 銷燬 attr
    pthread_mutexattr_destroy(&mutexAttr);
    pthread_condattr_destroy(&condAttr);
    
    data = nil;
    [self producter];  // 保證模型能走動,先執行一次生產者的操做
    
    for (int i = 0; i < 10; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [self consumer];
        });
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [self producter];
        });
    }

}

- (void)consumer {  // 消費者
    pthread_mutex_lock(&pMutex);
    while (data == nil) {
        pthread_cond_wait(&pCond, &pMutex);  // 等待數據
    }
    
    // 處理數據
    NSLog(@"data is finish");
    data = nil;
    
    pthread_mutex_unlock(&pMutex);
}

- (void)producter {  // 生產者
    pthread_mutex_lock(&pMutex);
    // 生產數據
    data = [[NSData alloc] init];
    NSLog(@"preparing data");
    sleep(1);
    
    pthread_cond_signal(&pCond);  // 發出信號,數據已完成
    pthread_mutex_unlock(&pMutex);
}
複製代碼

而後來看看 NSCondition,它是對 pthread_cond_tpthread_mutex_t 的一個封裝:

NSCondition *cond;
NSData *ns_data;
- (void)useNSCondition {
    
    cond = [[NSCondition alloc] init];
    ns_data = nil;
    
    [self ns_producter];
    
    for (int i = 0; i < 10; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [self ns_consumer];
        });
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            [self ns_producter];
        });
    }

}

- (void)ns_consumer {
    [cond lock];
    while (ns_data == nil) {
        [cond wait];  // 等待數據
    }
    // 處理數據
    NSLog(@"data is finish");
    [cond unlock];
}

- (void)ns_producter {
    [cond lock];
    // 生產數據
    ns_data = [[NSData alloc] init];
    NSLog(@"preparing data");
    sleep(1);
    
    [cond signal];  // 發出信號,數據已完成
    [cond unlock];
}
複製代碼

使用 NSCondition 不須要另外建立一個鎖,直接使用 [cond lock] 便可。

NSConditionLock

NSConditionLock 藉助 NSCondition 來實現,它的本質就是一個「生產-消費者」模型。「條件被知足」 能夠理解爲生產者提供了新的內容。NSConditionLock 的內部持有一個 NSCondition 對象,以及 _condition_value 屬性,在初始化時就會對這個屬性進行賦值。

// 簡化版代碼
- (id)initWithCondition:(NSInteger)value {
    if (nil != (self = [super inir])) {
        _condition = [NSCondition new];
        _condition_value = value;
    }
    return self;
}
複製代碼

它的 lockWhenCondition: 其實就是消費者方法:

- (void)lockWhenCondition:(NSInteger)value {
    [_condition lock];
    while (value != _condition_value) {
        [_condition wait];
    }
}
複製代碼

對應的 unlockWhenCondition: 方法則是生產者,使用了 boardcast 方法通知全部的消費者:

- (void)unlockWithCondition:(NSInteger)value {
    _condition_value = value;
    [_condition broadcast];
    [_condition unlock];
}
複製代碼

下面是一個完整的例子:

- (void)useNSConditionLock {
    NSConditionLock *condLock = [[NSConditionLock alloc] initWithCondition:1];
    
    [condLock lockWhenCondition:1];
    [self doSomeThingForFlag:1 finish:^{
        [condLock unlockWithCondition:2];
    }];
    
    [condLock lockWhenCondition:2];
    [self doSomeThingForFlag:2 finish:^{
        [condLock unlockWithCondition:3];
    }];
    
    [condLock lockWhenCondition:3];
    [self doSomeThingForFlag:3 finish:^{
        [condLock unlockWithCondition:4];
    }];
    
    [condLock lockWhenCondition:4];
    [self doSomeThingForFlag:4 finish:^{
        [condLock unlock];
    }];
    
}
複製代碼

NSConditionLock 是對 NSCondition 的進一步封裝,能夠對條件變量賦值,這樣咱們就能夠用它來實現順序執行線程。

@synchronized

是對 pthread_mutex_t 中遞歸鎖的一個封裝,蘋果不推薦使用,由於性能差。

- (void)useSynchronized {
    [self thread5];
}

- (void)thread5 {
    static int count = 0;
    @synchronized (self) {
        count ++;
        if (count < 10) {
            NSLog(@"%d",count);
            [self thread5];
        }
    }
    NSLog(@"finish:%d",count);
}
複製代碼

@synchronized 後面要跟一個 OC 對象,底層會以這個對象對大括號中的代碼進行加鎖,它其實是把這個對象當作鎖來使用,經過一個哈希表來實現,在 OC 的底層使用了一個互斥鎖的數組(能夠理解爲鎖池),經過對象的哈希值來獲取對應的互斥鎖。因此這實際上是一個 OC 層面的鎖,主要是經過犧牲性能換來語法上的簡潔和可讀性。

小結

以上的這些 Demo 我放到了 github 中。

同步方案的性能

由高到低排序(不絕對,狀況不同性能也有有區別):

  • OSSpinLock
  • os_unfair_lock
  • dispatch_semaphore
  • pthread_mutex
  • dispatch_queue(DISPATCH_QUEUE_SERIAL)
  • NSLock
  • NSCondition
  • pthread_mutex(recursive)
  • NSRecursiveLock
  • NSConditionLock
  • @synchronized

OC 對 pthread 的同種類型的鎖、信號量的封裝出來的對象,性能都不如直接使用 pthread,主要是由於 OC 多了一個消息傳遞的過程。

最後貼一下 YYKit 做者的一個性能比較圖:

參考文章

《計算機的心智操做系統之哲學原理(第2版)》

Linux線程-互斥鎖pthread_mutex_t

深刻理解 GCD

iOS GCD之dispatch_semaphore(信號量)

iOS簡單優雅的實現複雜狀況下的串行需求(各類鎖、GCD 、NSOperationQueue...)

再也不安全的 OSSpinLock

相關文章
相關標籤/搜索