併發的關鍵概念

併發

併發是什麼?

在計算機科學中,併發的定義是指:在一個程序的運行過程當中,程序的不一樣部分能夠以亂序或者部分有序的方式執行,可是最終程序的輸出結果與順序執行一致。linux

定義中有兩個關鍵點golang

  1. 亂序或者部分有序
  2. 結果與順序執行一致

假設程序 XA,B 兩部分組成,B 依賴 A,順序執行狀況下,先執行 A 而後執行 B,輸出結果爲 y,耗時:T_A + T_B數據庫

進一步研究發現,A 能夠分紅 A_1,A_2 兩部分,且 A_2 依賴於另外一個任務 C,也就是執行完A_1以後,須要等待 C 也執行完才能繼續往下走,令等待 C 完成的時間爲 T_C,那麼就有 T_A + T_B= T_{A1} + T_C + T_{A2} + T_B,這是順序執行狀況下的耗時。編程

再進一步研究發現,B 僅僅依賴於 A_1,爲了提升效率,咱們能夠這樣作,執行完 A_1 以後,爲了不CPU等待空閒,直接調度任務 B,等任務 B 完成以後,假設任務 C 也完成了,那麼切換到任務 A 執行完 A_2 部分。數據結構

這是一個簡單的併發case,能夠看到多線程

  1. 程序的執行順序變成部分有序,從A_1 -> A_2 -> B 變成 A_1 -> B -> A_2
  2. 任務 BA_2 的前置依賴均被知足,保證了最終輸出結果依然是 y

爲何要併發?

那爲何要費這麼多事來實現併發呢,老老實實順序執行很差嗎?換句話說,經過併發咱們得到了什麼。併發

效率是關鍵。在上面的例子中,採用併發的執行方式,T_C 被節省下來。編程語言

併發帶來的另外一個明顯好處是多任務,就算是在一個單核CPU(single processor)機器上,也能同時運行多個應用,這是由於多個應用能夠分時複用CPU,這是多個應用之間的併發。實際上,單核CPU同一時刻只能運行一個應用(這就是爲何我把上文的「同時」二字加粗的緣由),可是從用戶的視角來看好像有多個CPU同樣,應用之間的併發虛擬化出了多個CPU的效果。函數

沒有併發的世界是可怕的,想想你只有把所有的工做作完才能去玩遊戲,但是工做哪有作完的時候呢?我只好在工做和遊戲之間來回切換,切換是有代價的,全情投入工做以前,要先把遊戲裏的心思先收回來,回憶起上一段工做的內容,這跟線程切換幾乎如出一轍。把我類比成CPU,實際上,我從事的各類活動是在分時複用「我」這個資源的。工具

併發有什麼問題?

通常咱們遇到線程併發和協程併發的狀況比較多,這裏的線程和協程就是併發單位。爲了具體的說明問題,咱們拿線程的併發舉例。

第一個關鍵概念是臨界區(critical section)。臨界區指的是一段代碼,這段代碼會訪問共享資源,這個共享資源多是一個簡單變量,也多是一個更加複雜的數據結構。

第二個關鍵概念是競爭條件(race condition)。競爭條件是指,在多線程程序中,多個線程有可能幾乎同時訪問臨界區,而且嘗試更新共享資源,這可能致使意想不到的結果。好比說,兩個線程同時對共享變量x執行自增操做,結果多是+1,也多是+2。也就是程序的運行結果是不肯定的,這樣的程序叫作非肯定程序(indeterminate program),這是第三個關鍵概念。

程序運行的非肯定性,這是併發要解決的本質問題,至於併發程序難於編寫、難於理解、編寫不當還會出現死鎖,這些都是屬於技術層面的問題(因此併發的定義中沒有提到效率)。

怎麼實現併發?

爲了保證併發程序的肯定性,咱們須要使用一些工具,這些工具叫同步原語(mutual exclusion primitives),具體來講有:

  1. 互斥變量(mutex)
  2. 條件變量(condition variable,也叫作monitor)
  3. 信號量(semaphore)

互斥變量提供一種加鎖機制。在訪問臨界區的以前,調用互斥變量的lock函數,可以保證每次只有一個線程進入到臨界區,固然,離開臨界區以後作的第一件事就是調用互斥變量提供的unlock函數,釋放共享資源,保證其餘線程或者當前線程下一次可以再次進入臨界區。多線程環境下,共享變量x的自增操做可使用互斥變量來保證正確性。

互斥變量提供一種互斥訪問的機制,條件變量提供的則是同步機制。想讓任務B在任務A以後執行,只須要使用互斥變量m,在調度任務B以前調用m的wait函數,在執行任務A以後調用m的signal函數。m的做用是,無論調度順序怎麼樣,在signal執行以前,wait會一直等待。

信號量最先由Dijkstra提出,目的也是爲了防止競爭條件的出現,可是其原始語義與條件變量和信號量不同,而且咱們會看到,互斥變量和條件變量都是信號量的一種特殊形式。

每一個信號量都有一個counter,表明當前可用資源數,信號量還提供兩個操做,sem_wait:當counter-1大於0的時候返回成功,且執行counter減1,當counter-1小於0的時候阻塞;sem_post,執行counter加1操做,且若是當前有線程正在等待,隨機喚醒其中一個線程。

counter值只能取0或者1的的信號量稱之爲布爾信號量(binary semaphore),counter初始值爲1的布爾信號量功能至關於互斥變量,counter初始值爲0的布爾信號量至關於條件變量。

GOLANG中的互斥變量、條件變量、信號量

下面用具體的case說明爲何binary semaphore能夠實現互斥變量、條件變量的功能。

GOLANG中的互斥變量

golang中的sync.Mutex就是互斥變量,如上所述,互斥變量能夠解決多線程共享變量自增的正確性。

package main

import (
  "fmt"
  "sync"
)

var x = 0

func increment(wg *sync.WaitGroup, m *sync.Mutex) {
  m.Lock()
  x = x + 1
  m.Unlock()
  wg.Done()
}
func main() {
  var w sync.WaitGroup
  var m sync.Mutex
  for i := 0; i < 1000; i++ {
      w.Add(1)
      go increment(&w, &m) // 這裡必定要用 address
  }
  w.Wait()
  fmt.Println("final value of x", x)
}

複製代碼

互斥變量做用等同於容量爲1的信號量,因此上面的case能夠改寫成:

package main

import (
  "fmt"
  "sync"
)

var x = 0

func increment(wg *sync.WaitGroup, m chan int) {
  m <- 1 // 信號量代替mutex
  x = x + 1
  <- m
  wg.Done()
}
func main() {
  var w sync.WaitGroup
  m := make(chan int, 1)
  for i := 0; i < 1000; i++ {
      w.Add(1)
      go increment(&w, m) // 這裡必定要用 address
  }
  w.Wait()
  fmt.Println("final value of x", x)
}

複製代碼

GOLANG中的條件變量

GOLANG中的條件變量就是unbuffered channel。實際上,channel就是golang中的信號量實現,buffered channel的capacity就是信號量中的counter,不防統一稱之爲容量。

事實上,unbuffered channel的capacity等於0,前面說過,容量爲0的信號量做用等同於條件變量。

下面的case我想在程序退出(也就是main goroutine結束)以前在屏幕上輸出hello world,爲了實現這點,我使用了done這個類型爲chan bool的channel變量。

package main

import (
    "fmt"
    "time"
)

func hello(done chan bool) {
    fmt.Println("hello world")
    time.Sleep(4 * time.Second)
    done <- true
}
func main() {
    done := make(chan bool) // done的做用等同於條件變量
    fmt.Println("Main going to call hello go goroutine")
    go hello(done)
    <- done // 管道讀操做一直block,直到 hello goroutine執行並往管道中寫數據,註釋掉此行,main goroutine會一直執行到結束,hello goroutine不會被調度
    fmt.Println("Main received data")
}

複製代碼

C語言中的互斥變量、條件變量、信號量

C語言中同步原語的實現體如今pthread(POSIX Threads,POSIX是個標準,pthread是按照POSIX關於線程的標準實現的線程庫)這個庫中。

  • pthread_mutex_t:互斥變量類型,加鎖:pthread_mutex_lock,釋放鎖:pthread_mutex_unlock
  • pthread_cond_t:條件變量類型,等待:pthread_cond_wait,喚醒:pthread_cond_signal
  • sem_t:信號量類型,wait:sem_wait,post:sem_post

另外,pthread還提供pthread_join函數,其語義與GOLANG中的waitgroup一致。

GOLANG中的SELECT語義pthread庫沒有直接提供,可是POSIX標準裏面定義了select和pselect這兩個功能差很少的函數來實現這個語義,linux中這兩個函數都是做爲系統調用實現,不一樣的是select和pselect監聽的都是文件描述符(poll epoll select的區別與聯繫)。

生產者消費者問題

下面用C和GOLANG兩種語言實現多producer,多consumer的生產者消費者隊列。

  1. C語言版
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <semaphore.h>

#define MAX 4

int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;

int loops = 100;

void put(int value) {
    buffer[fill] = value;
    fill = (fill + 1) % MAX;
    count++;
}

int get() {
    int tmp = buffer[use];
    use = (use + 1) % MAX;
    count--;

    return tmp;
}

sem_t empty;
sem_t full;
sem_t mutex;

void *producer(void *arg) {
    int i;
    for(i = 0; i < loops; i++)
    {
        sem_wait(&empty);
        sem_wait(&mutex);
        put(i);
        sem_post(&mutex);
        sem_post(&full);
    }

    return 0;
}

void *consumer(void *arg) {
    int i, tmp = 0;
    for(i = 0; i < loops; i++)
    {
        sem_wait(&full);
        sem_wait(&mutex);
        tmp = get();
        sem_post(&mutex);
        sem_post(&empty);
        printf("current number : %d\n", tmp);
    }

    return 0;
}


int main() {
    sem_init(&empty, 0, MAX);
    sem_init(&full, 0, 0);
    sem_init(&mutex, 0, 1);

    pthread_t p, c, p1, p2, c1, c2;

    pthread_create(&p, NULL, producer, NULL);
    pthread_create(&p1, NULL, producer, NULL);
    pthread_create(&p2, NULL, producer, NULL);

    pthread_create(&c, NULL, consumer, NULL);
    pthread_create(&c1, NULL, consumer, NULL);
    pthread_create(&c2, NULL, consumer, NULL);

    pthread_join(p, NULL);
    pthread_join(p1, NULL);
    pthread_join(p2, NULL);
    pthread_join(c, NULL);
    pthread_join(c1, NULL);
    pthread_join(c2, NULL);

    return 0;
}
複製代碼
  1. GOLANG版
package main

import (
  "fmt"
)
var MSG_BUFFER = 4
var COSUMER_CNT = 3
var NUM_CNT = 100

var msgs = make(chan int, MSG_BUFFER)
// 多個消費者,用buffered channel控制消費者所有執行完以後推出main goroutine
var done = make(chan int, COSUMER_CNT)

func produce() {
    for i := 0; i < NUM_CNT; i++ {
        msgs <- i
    }
}

func consume() {
    for i := 0; i < NUM_CNT; i++ {
        msg := <-msgs
        fmt.Println(msg)
    }
    done <- 1
}

func main () {
    for i:= 0; i < COSUMER_CNT; i++ {
        go produce()
        go consume()
    }

    for i:= 0; i < COSUMER_CNT; i++ {
        <- done
    }
}
複製代碼

總結以及下篇展望

互斥變量其實是一個鎖,條件變量和信號量都是基於鎖實現的,有必要說說鎖的原理,下篇內容包括:

  • cpu層面的原子操做 TAS case(彙編代碼)
  • 幾種鎖的實現方式(硬件中斷鎖、自旋鎖、隊列鎖)
  • 評估不一樣鎖實現方式的關鍵:效率和公平
  • 幾種編程語言中的鎖:GOLANG、JAVA
  • 數據庫中的鎖

參考

  • 本文大部分知識經過閱讀Operating Systems: Three Easy Pieces這本操做系統教材習得,這是我讀過的最好的關於操做系統的教材。本書最大的特色是將操做系統僅僅分紅三個部分來說述:虛擬化、並行、持久化,雖然只有三個部分,操做系統的核心部分卻都覆蓋到了。本書做者的語言功底很強,讀起來朗朗上口。另外,本書的脈絡清楚,整本書都在回答一個問題:操做系統做爲第一個也是最重要的一個軟件,是如何使得計算機系統易於用戶使用的。
相關文章
相關標籤/搜索