Perl多線程(2):數據共享和線程安全

線程數據共享

在介紹Perl解釋器線程的時候一直強調,Perl解釋器線程在被建立出來的時候,將從父線程中拷貝數據到子線程中,使得數據是線程私有的,而且數據是線程隔離的。若是真的想要在線程間共享數據,須要顯式使用threads::shared模塊來擴展threads模塊的功能。這個模塊必須在先導入了threads模塊的狀況下使用,不然threads::shared模塊裏的功能都將沒效果。html

use threads;
use threads::shared;

要共享數據,只需使用threads::shared模塊的share方法便可,也能夠直接將數據標記上:shared屬性的方式來共享。例如:python

my $answer = 43;
my @arr = qw(1 2 3);
share($answer);
share(@arr);

my $answer :shared = 43;
my @arr :shared = qw(1 2 3);
my %hash :shared = (one=>1, two=>2, three=>3);

使用share()和:shared的區別在於後面這種標記的方式是在編譯期間完成的。另外,使用:shared屬性標記的方式能夠直接共享引用類型的數據,可是share()不容許,它使用prototype限制了只容許接收變量類型的參數,可使用&share()調用子程序的方式禁用prototype:shell

my $aref = &share([1 2 3]);

例如:數組

#!/usr/bin/perl
use strict;
use warnings;
use 5.010;

use threads;
use threads::shared;

my $foo :shared = 1;
my $bar = 1;
threads->create(
    sub {
        $foo++;
        $bar++;
        say "new thread: \$foo=$foo";   # 2
        say "new thread: \$bar=$bar";   # 2 
    }
)->join();

say "main thread: \$foo=$foo";   # 2
say "main thread: \$bar=$bar";   # 1

什麼數據會共享

並不是全部數據均可以共享,只有普通變量、數組、hash以及已共享數據的引用能夠共享。也就是:安全

Ordinary scalars
Array refs
Hash refs
Scalar refs
Objects based on the above

例如:數據結構

use threads;
use threads::shared;

my $var = 1;     #  未共享數據
my $svar :shared = 2;    # 共享標量
my @arr :shared = qw(perl python shell);   # 共享數組
my %hash :shared;       # 共享hash

my $thr = threads->new(\&mysub);

sub mysub {
    $hash{a} = 1;       # 成功
    $hash{b} = $var;    # 成功:$var是普通變量
    $hash{c} = \$svar;  # 成功:\$svar是已共享標量
    $hash{d} = @arr;    # 成功:普通數組
    $hash{e} = \@arr;   # 成功:已共享數組的引用

    # $hash{f} = \$var; # 失敗並die:$var未共享標量的引用
    # $hash{g} = [];    # 失敗:未共享數組的引用
    # $hash{h} = {a=>1};# 失敗:未共享hash的引用
}

$thr->join();     # join後文解釋
while( my ($key, $value) = each %hash ){
    say "$key => $value";
}

若是共享hash或array類型,那麼裏面的全部元素都對外可見,但並不意味着裏面的元素是共享的。共享hash/array和共享它們裏面的元素是獨立的,共享hash/array只意味着共享它們自身,但裏面的元素會暴露。反之,能夠直接共享某個元素,但hash/array自身不共享。(經測試,數組的元素沒法共享,hash的元素可正常共享)多線程

數據共享的問題:競態

當多個線程在某一時刻都訪問或修改同一個共享數據時,就會出現競態問題(race condition),它意味着多線程的數據競爭問題。app

例如:測試

use threads;
use threads::shared;

my $x :shared = 1;
my $thr1 = threads->new(\&sub1);
my $thr2 = threads->new(\&sub2);

$thr1->join();
$thr2->join();
print "$x\n";

sub sub1 { my $foo = $x; $x = $foo + 1; }
sub sub1 { my $bar = $x; $x = $bar + 1; }

執行上面的程序,結果可能會輸出2或3,由於兩個線程可能都取得x=1的值,也可能後一個線程取得前一個線程加法以後的值。prototype

之因此會發生競態問題,是由於對多個線程對同個數據的訪問和修改時間點沒法保證,這個時候數據是線程不安全的,也可稱之爲線程數據不一樣步。

因此,要解決數據競態問題,必須對共享數據的步驟進行協調,好比修改數據時必須保證只能有一個線程去修改,這能夠經過鎖的方式來實現。

變量鎖

threads::shared模塊中提供了一個lock()方法,用來將共享數據進行獨佔鎖定,被鎖定的數據沒法被其它線程修改,直到釋放鎖其它線程才能夠獲取鎖並修改數據。

例如:

use threads;
use threads::shared;

my $var :shared = 3;

sub mysub {
    ...
    lock($var);
    ...
}  # 鎖在這裏自動被釋放

沒有unlock()這樣直接釋放鎖的方法,而是在退出當前做用域的時候自動釋放鎖,就像詞法變量同樣。

另外,鎖住hash和數組的時候,僅僅只是鎖住它們自身,但lock()沒法去鎖hash/array中的元素。因此:

lock $myhash{'abc'};    # Error
lock %myhash;           # Correct

若是真想基於容器中元素進行鎖定,可使用線程信號量模塊Thread::Semaphore,後文會介紹。

另外,lock()是能夠遞歸的,在退出最外層lock()的做用域時釋放鎖。且遞歸時重複鎖定同一個變量是冪等的。例如:

my $x :shared;
doit();
sub doit {
    {
        {
            lock($x); # Wait for lock
            lock($x); # 沒任何做用,由於已經鎖過一次了
            {
                lock($x); # 沒任何做用,由於已經鎖過一次了
                {
                    lock($x); # 沒任何做用,由於已經鎖過一次了
                    lockit_some_more();
                }
            }
        } # *** Implicit unlock here ***
    }
}
sub lockit_some_more {
    lock($x); # 沒任何做用,由於已經鎖過一次了
} # Nothing happens here

死鎖問題

使用鎖來協調共享數據的步驟能解決競態問題,可是若是協調很差,很容易出現死鎖問題。死鎖是指兩個或更多進程/線程互相等待鎖的釋放,致使每個進程/線程都沒法釋放,從而出現無限等待的死循環問題。

例如:

use threads;
use threads::shared;

my $x :shared = 4;
my $y :shared = 'foo';

my $thr1 = threads->create(
    sub {
        lock($x);
        sleep 3;
        lock($y);
    }
);

my $thr2 = threads->create(
    sub {
        lock($y);
        sleep 3;
        lock($x);
    }
);

sleep 10;

上面的例子只要運行,兩個線程將會出現死鎖問題,由於thr1線程鎖住$x、thr2鎖住$y後,thr1申請$y的鎖將等待thr2先釋放,同理thr2申請$x的鎖將等待thr1先釋放。因而出現了互相等待的僵局,誰也不會也沒法釋放。

解決死鎖最簡單且最佳的方式是保證全部線程以相同的順序去鎖住每個數據。例如,全部線程都以先鎖住$x,再鎖住$y,最後鎖住$z的方式去執行代碼。

另外一個避免死鎖的解決方案是儘量讓鎖住共享數據的時間段變短,這樣出現僵局的概率就會小不少。

可是這兩種方式不少時候都派不上用場,由於須要用到鎖的狀況可能會比較複雜。下面介紹幾種方式。

線程隊列(Thread::Queue)

(Thread::Queue)隊列數據結構(FIFO)是線程安全的,它保證了某些線程從一端寫入數據,另外一些線程從另外一端讀取數據。只要隊列已經滿了,寫入操做就自動被阻塞直到有空間支持寫操做,只要隊列空了,讀取操做就會自動阻塞直到隊列中有數據可讀。這種模式自身就保證了線程安全性。

在Perl中要使用線程隊列,須要使用Thread::Queue模塊,使用方式很簡單。以下示例:

#!/usr/bin/perl
use strict;
use warnings;

use threads;
use Thread::Queue;

# 建立一個線程隊列
my $DataQueue = Thread::Queue->new();

# 建立線程
my $thr = threads->new(
    sub {
        # 在循環中讀取隊列
        while (my $DataElement = $DataQueue->dequeue()) {
            print "Poped $DataElement off the queue\n";
        }
    }
);

# 向隊列中寫入一個數據
$DataQueue->enqueue(12);
sleep 1;

# 再次寫入隊列3個數據
$DataQueue->enqueue('a','b','c');
sleep 3;

# 關閉隊列,讓讀取端再也不阻塞
$DataQueue->enqueue(undef);

# 等待子線程併爲其收屍
$thr->join();

關於Thread::Queue模塊的用法,參見:https://www.cnblogs.com/f-ck-need-u/p/10422293.html

Thread::Semaphore

Thread::Semaphore實現了線程信號量,能夠經過up()和down()來操做信號量,up()表示增長信號量的值,down()表示減信號量的值,按照鎖的角度來看這是申請鎖的操做,只要減法操做後信號量的值爲負數,此次減法操做就會被阻塞,就像被鎖住。

經過Thread::Semaphore的new()方法來建立一個信號量,若是不給任何參數,則默認建立一個信號量值爲1的信號量。若是給new()一個整數值N,則表示建立一個信號量值爲N的信號量。

使用信號量實現鎖機制的示例:

#!/usr/bin/perl

use 5.010;
use threads;
use Thread::Semaphore;

# 新建一個信號量
my $sem = Thread::Semaphore->new();

# 全局共享變量
my $gbvar :shared = 0;

my $thr1 = threads->create(\&mysub, 1);
my $thr2 = threads->create(\&mysub, 2);
my $thr3 = threads->create(\&mysub, 3);

# 每一個線程給全局共享變量依次加10
sub mysub {
    my $thr_id = shift;
    my $try_left = 10;
    my $local_value;
    sleep 1;
    while($try_left--){
        # 至關於獲取鎖
        $sem->down();

        $local_value = $gbvar;
        say "$try_left tries left for sub $thr_id "."(\$gbvar is $gbvar)";
        sleep 1;
        $local_value++;
        $gbvar = $local_value;
        
        # 至關於釋放鎖
        $sem->up();
    }
}

$thr1->join();
$thr2->join();
$thr3->join();

因爲信號量能夠鎖住任何片斷的代碼,因此它的鎖機制很是靈活。

實際上,up()和down()每次操做默認都只增、減1個信號量的值,但能夠給它們傳遞參數來一次性請求加N、減N個信號量值,對於減法操做,若是請求減N致使信號量的值爲負數,則該減法操做被阻塞,直到有足夠的信號量完成此次減法。這時的信號量就像是一個計數器同樣。

例如:

use threads;
use Thread::Semaphore;

# 建立一個信號量值爲5的信號量
my $sem = Thread::Semaphore->new(5);

my $thr1 = threads->new(\&sub1);
my $thr2 = threads->new(\&sub1);

sub sub1 {
    # 申請鎖
    $sem->down(5);  # 一次減5
    ... do something here ...
    $sem->up(5);  # 一次加5
}

$thr1->join();
$thr2->join();
相關文章
相關標籤/搜索