在計算機科學中,併發的定義是指:在一個程序的運行過程當中,程序的不一樣部分能夠以亂序或者部分有序的方式執行,可是最終程序的輸出結果與順序執行一致。linux
定義中有兩個關鍵點golang
假設程序 由
兩部分組成,
依賴
,順序執行狀況下,先執行
而後執行
,輸出結果爲
,耗時:
。數據庫
進一步研究發現, 能夠分紅
兩部分,且
依賴於另外一個任務
,也就是執行完
以後,須要等待
也執行完才能繼續往下走,令等待
完成的時間爲
,那麼就有
,這是順序執行狀況下的耗時。編程
再進一步研究發現, 僅僅依賴於
,爲了提升效率,咱們能夠這樣作,執行完
以後,爲了不CPU等待空閒,直接調度任務
,等任務
完成以後,假設任務
也完成了,那麼切換到任務
執行完
部分。數據結構
這是一個簡單的併發case,能夠看到多線程
那爲何要費這麼多事來實現併發呢,老老實實順序執行很差嗎?換句話說,經過併發咱們得到了什麼。併發
效率是關鍵。在上面的例子中,採用併發的執行方式, 被節省下來。編程語言
併發帶來的另外一個明顯好處是多任務,就算是在一個單核CPU(single processor)機器上,也能同時運行多個應用,這是由於多個應用能夠分時複用CPU,這是多個應用之間的併發。實際上,單核CPU同一時刻只能運行一個應用(這就是爲何我把上文的「同時」二字加粗的緣由),可是從用戶的視角來看好像有多個CPU同樣,應用之間的併發虛擬化出了多個CPU的效果。函數
沒有併發的世界是可怕的,想想你只有把所有的工做作完才能去玩遊戲,但是工做哪有作完的時候呢?我只好在工做和遊戲之間來回切換,切換是有代價的,全情投入工做以前,要先把遊戲裏的心思先收回來,回憶起上一段工做的內容,這跟線程切換幾乎如出一轍。把我類比成CPU,實際上,我從事的各類活動是在分時複用「我」這個資源的。工具
通常咱們遇到線程併發和協程併發的狀況比較多,這裏的線程和協程就是併發單位。爲了具體的說明問題,咱們拿線程的併發舉例。
第一個關鍵概念是臨界區(critical section)。臨界區指的是一段代碼,這段代碼會訪問共享資源,這個共享資源多是一個簡單變量,也多是一個更加複雜的數據結構。
第二個關鍵概念是競爭條件(race condition)。競爭條件是指,在多線程程序中,多個線程有可能幾乎同時訪問臨界區,而且嘗試更新共享資源,這可能致使意想不到的結果。好比說,兩個線程同時對共享變量x執行自增操做,結果多是+1,也多是+2。也就是程序的運行結果是不肯定的,這樣的程序叫作非肯定程序(indeterminate program),這是第三個關鍵概念。
程序運行的非肯定性,這是併發要解決的本質問題,至於併發程序難於編寫、難於理解、編寫不當還會出現死鎖,這些都是屬於技術層面的問題(因此併發的定義中沒有提到效率)。
爲了保證併發程序的肯定性,咱們須要使用一些工具,這些工具叫同步原語(mutual exclusion primitives),具體來講有:
互斥變量提供一種加鎖機制。在訪問臨界區的以前,調用互斥變量的lock函數,可以保證每次只有一個線程進入到臨界區,固然,離開臨界區以後作的第一件事就是調用互斥變量提供的unlock函數,釋放共享資源,保證其餘線程或者當前線程下一次可以再次進入臨界區。多線程環境下,共享變量x的自增操做可使用互斥變量來保證正確性。
互斥變量提供一種互斥訪問的機制,條件變量提供的則是同步機制。想讓任務B在任務A以後執行,只須要使用互斥變量m,在調度任務B以前調用m的wait函數,在執行任務A以後調用m的signal函數。m的做用是,無論調度順序怎麼樣,在signal執行以前,wait會一直等待。
信號量最先由Dijkstra提出,目的也是爲了防止競爭條件的出現,可是其原始語義與條件變量和信號量不同,而且咱們會看到,互斥變量和條件變量都是信號量的一種特殊形式。
每一個信號量都有一個counter,表明當前可用資源數,信號量還提供兩個操做,sem_wait:當counter-1大於0的時候返回成功,且執行counter減1,當counter-1小於0的時候阻塞;sem_post,執行counter加1操做,且若是當前有線程正在等待,隨機喚醒其中一個線程。
counter值只能取0或者1的的信號量稱之爲布爾信號量(binary semaphore),counter初始值爲1的布爾信號量功能至關於互斥變量,counter初始值爲0的布爾信號量至關於條件變量。
下面用具體的case說明爲何binary semaphore能夠實現互斥變量、條件變量的功能。
golang中的sync.Mutex就是互斥變量,如上所述,互斥變量能夠解決多線程共享變量自增的正確性。
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m *sync.Mutex) {
m.Lock()
x = x + 1
m.Unlock()
wg.Done()
}
func main() {
var w sync.WaitGroup
var m sync.Mutex
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, &m) // 這裡必定要用 address
}
w.Wait()
fmt.Println("final value of x", x)
}
複製代碼
互斥變量做用等同於容量爲1的信號量,因此上面的case能夠改寫成:
package main
import (
"fmt"
"sync"
)
var x = 0
func increment(wg *sync.WaitGroup, m chan int) {
m <- 1 // 信號量代替mutex
x = x + 1
<- m
wg.Done()
}
func main() {
var w sync.WaitGroup
m := make(chan int, 1)
for i := 0; i < 1000; i++ {
w.Add(1)
go increment(&w, m) // 這裡必定要用 address
}
w.Wait()
fmt.Println("final value of x", x)
}
複製代碼
GOLANG中的條件變量就是unbuffered channel。實際上,channel就是golang中的信號量實現,buffered channel的capacity就是信號量中的counter,不防統一稱之爲容量。
事實上,unbuffered channel的capacity等於0,前面說過,容量爲0的信號量做用等同於條件變量。
下面的case我想在程序退出(也就是main goroutine結束)以前在屏幕上輸出hello world,爲了實現這點,我使用了done這個類型爲chan bool的channel變量。
package main
import (
"fmt"
"time"
)
func hello(done chan bool) {
fmt.Println("hello world")
time.Sleep(4 * time.Second)
done <- true
}
func main() {
done := make(chan bool) // done的做用等同於條件變量
fmt.Println("Main going to call hello go goroutine")
go hello(done)
<- done // 管道讀操做一直block,直到 hello goroutine執行並往管道中寫數據,註釋掉此行,main goroutine會一直執行到結束,hello goroutine不會被調度
fmt.Println("Main received data")
}
複製代碼
C語言中同步原語的實現體如今pthread
(POSIX Threads,POSIX是個標準,pthread是按照POSIX關於線程的標準實現的線程庫)這個庫中。
另外,pthread
還提供pthread_join
函數,其語義與GOLANG中的waitgroup一致。
GOLANG中的SELECT語義pthread
庫沒有直接提供,可是POSIX標準裏面定義了select和pselect這兩個功能差很少的函數來實現這個語義,linux中這兩個函數都是做爲系統調用實現,不一樣的是select和pselect監聽的都是文件描述符(poll epoll select的區別與聯繫)。
下面用C和GOLANG兩種語言實現多producer,多consumer的生產者消費者隊列。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <assert.h>
#include <semaphore.h>
#define MAX 4
int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;
int loops = 100;
void put(int value) {
buffer[fill] = value;
fill = (fill + 1) % MAX;
count++;
}
int get() {
int tmp = buffer[use];
use = (use + 1) % MAX;
count--;
return tmp;
}
sem_t empty;
sem_t full;
sem_t mutex;
void *producer(void *arg) {
int i;
for(i = 0; i < loops; i++)
{
sem_wait(&empty);
sem_wait(&mutex);
put(i);
sem_post(&mutex);
sem_post(&full);
}
return 0;
}
void *consumer(void *arg) {
int i, tmp = 0;
for(i = 0; i < loops; i++)
{
sem_wait(&full);
sem_wait(&mutex);
tmp = get();
sem_post(&mutex);
sem_post(&empty);
printf("current number : %d\n", tmp);
}
return 0;
}
int main() {
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);
pthread_t p, c, p1, p2, c1, c2;
pthread_create(&p, NULL, producer, NULL);
pthread_create(&p1, NULL, producer, NULL);
pthread_create(&p2, NULL, producer, NULL);
pthread_create(&c, NULL, consumer, NULL);
pthread_create(&c1, NULL, consumer, NULL);
pthread_create(&c2, NULL, consumer, NULL);
pthread_join(p, NULL);
pthread_join(p1, NULL);
pthread_join(p2, NULL);
pthread_join(c, NULL);
pthread_join(c1, NULL);
pthread_join(c2, NULL);
return 0;
}
複製代碼
package main
import (
"fmt"
)
var MSG_BUFFER = 4
var COSUMER_CNT = 3
var NUM_CNT = 100
var msgs = make(chan int, MSG_BUFFER)
// 多個消費者,用buffered channel控制消費者所有執行完以後推出main goroutine
var done = make(chan int, COSUMER_CNT)
func produce() {
for i := 0; i < NUM_CNT; i++ {
msgs <- i
}
}
func consume() {
for i := 0; i < NUM_CNT; i++ {
msg := <-msgs
fmt.Println(msg)
}
done <- 1
}
func main () {
for i:= 0; i < COSUMER_CNT; i++ {
go produce()
go consume()
}
for i:= 0; i < COSUMER_CNT; i++ {
<- done
}
}
複製代碼
互斥變量其實是一個鎖,條件變量和信號量都是基於鎖實現的,有必要說說鎖的原理,下篇內容包括: