[perl] 多線程理解

Thread:在使用多線程處理比較大的數據量的掃描,遇到讀寫文件可能死鎖的問題。 多線程

Perl 線程的生命週期

1.使用 threads 包的 create() 方法:

use threads; 

sub say_hello { 
    printf("Hello thread! @_.\n"); 
    return( rand(10) ); 
} 

my $t1 = threads->create( \&say_hello, "param1", "param2" ); 
my $t2 = threads->create( "say_hello", "param3", "param4" ); 
my $t3 = threads->create( 
sub { 
    printf("Hello thread! @_\n"); 
    return( rand(10) ); 
}, "param5", "param6" );



2.join 和 detach 方法

線程一旦被成功建立,它就馬上開始運行了,這個時候你面臨兩種選擇,分別是 join 或者 detach 這個新建線程。固然你也能夠什麼都不作,不過這可不是一個好習慣、 函數

從字面上來理解,join 就是把新建立的線程結合到當前的主線程中來,把它當成是主線程的一部分,使他們合二爲一。join 會觸發兩個動做,首先,主線程會索取新建線程執行結束之後的返回值;其次,新建線程在執行完畢並返回結果之後會自動釋放它本身所佔用的系統資源。例如 ui

join收割新建線程 spa

#!/usr/bin/perl 
# 
use threads; 

sub func { 
 sleep(1); 
 return(rand(10)); 
} 

my $t1 = threads->create( \&func ); 
my $t2 = threads->create( \&func ); 

printf("do something in the main thread\n"); 

my $t1_res = $t1->join(); 
my $t2_res = $t2->join(); 

printf("t1_res = $t1_res\nt2_res = $t2_res\n");

由此咱們不難發現,調用 join 的時機是一個十分有趣的問題。若是調用 join 方法太早,新建線程還沒有執行完畢,天然就沒法返回任何結果,那麼這個時候,主線程就不得不被阻塞,直到新建線程執行完畢以後,才能得到返回值,而後資源會被釋放,join 才能結束,這在很大程度上破話了線程之間的並行性。相反,若是調用 join 方法太晚,新建線程早已執行完畢,因爲一直沒有機會返回結果,它所佔用的資源就一直沒法獲得釋放,直到被 join 爲止,這在很大程度上浪費了寶貴的系統資源。所以,join 新建線程的最好時機應該是在它剛剛執行完畢的時候,這樣既不會阻塞當前線程的執行,又能夠及時釋放新建線程所佔用的系統資源。  線程

foreach ( threads->list(threads::joinable) ){
        $_->join( );
}

咱們再來看看 detach 方法,這也許是最省心省力的處理方法了。從字面上來理解,detach 就是把新建立的線程與當前的主線程剝離開來,讓它今後和主線程無關。當你使用 detach 方法的時候,代表主線程並不關心新建線程執行之後返回的結果,新建線程執行完畢後 Perl 會自動釋放它所佔用的資源。 scala

detach剝離線程 調試

#!/usr/bin/perl 
# 
use threads; 
use Config; 

sub say_hello { 
 my ( $name ) = @_; 
 printf("Hello World! I am $name.\n"); 
} 
my $t1 = threads->create( \&say_hello, "Alex" ); 
$t1->detach(); 
printf("doing something in main thread\n"); 
sleep(1);



一個新建線程一旦被 detach 之後,就沒法再 join 了。當你使用 detach 方法剝離線程的時候,有一點須要特別注意,那就是你須要保證被建立的線程先於主線程結束,不然你建立的線程會被迫結束,除非這種結果正是你想要的,不然這也許會形成異常狀況的出現,並增長程序調試的難度。

3.線程的消亡 code

大多數狀況下,你但願你建立的線程正常退出,這就意味着線程所對應的函數體在執行完畢後返回並釋放資源。例如在清單 5 的示例中,新建線程被 join 之後的退出過程。但是,若是因爲 detach 不當或者因爲主線因某些意外的異常提早結束了,儘管它所建立的線程可能還沒有執行完畢,可是他們仍是會被強制停止,正所謂皮之不存,毛將焉附。這時你也許會獲得一個相似於「Perl exited with active threads」的警告。 生命週期

固然,你也能夠顯示地調用 exit() 方法來結束一個線程,不過值得注意的是,默認狀況下,若是你在一個線程中調用了 exit() 方法, 其餘線程都會隨之一塊兒結束,在不少狀況下,這也許不是你想要的,若是你但願 exit() 方法只在調用它的線程內生效,那麼你在建立該線程的時候就須要設置’ exit ’ => ’ thread_only ’。例如 隊列

爲某個線程設置exit屬性

#!/usr/bin/perl 
# 
use threads; 

sub say_hello { 
 printf("Hello thread! @_.\n"); 
 sleep(10); 
 printf("Bye\n"); 
} 

sub quick_exit { 
 printf("I will be exit in no time\n"); 
 exit(1); 
} 

my $t1 = threads->create( \&say_hello, "param1", "param2" ); 
my $t2 = threads->create( {'exit'=>'thread_only'}, \&quick_exit ); 

$t1->join(); 
$t2->join();



若是你但願每一個線程的 exit 方法都只對本身有效,那麼在每次建立一個新線程的時候都去要顯式設置’ exit ’ => ’ thread_only ’屬性顯然有些麻煩,你也能夠在引入 threads 包的時候設置這個屬性在全局範圍內有效,例如

use threads ('exit' => 'threads_only'); 

sub func { 
 ... 
 if( $condition ) { 
 exit(1); 
 } 
} 

my $t1 = threads->create( \&func ); 
my $t2 = threads->create( \&func ); 

$t1->join(); 
$t2->join();




共享與同步

threads::shared

#!/usr/bin/perl 
 # 

 use threads; 
 use threads::shared; 
 use strict; 

 my $var   :shared  = 0;       # use :share tag to define 
 my @array :shared = (); # use :share tag to define 
 my %hash = (); 
 share(%hash);                  # use share() funtion to define 


 sub start { 
 $var = 100; 

 @array[0] = 200; 
 @array[1] = 201; 

 $hash{'1'} = 301; 
 $hash{'2'} = 302; 
 } 

 sub verify { 
    sleep(1);                      # make sure thread t1 execute firstly 
    printf("var = $var\n");     # var=100 

 for(my $i = 0; $i < scalar(@array); $i++) { 
        printf("array[$i] = $array[$i]\n");    # array[0]=200; array[1]=201 
 } 

 foreach my $key ( sort( keys(%hash) ) ) { 
 printf("hash{$key} = $hash{$key}\n"); # hash{1}=301; hash{2}=302 
 } 
 } 

 my $t1 = threads->create( \&start ); 
 my $t2 = threads->create( \&verify ); 

 $t1->join(); 
 $t2->join();

死鎖常是多線程程序中最隱蔽的問題,每每難以發現與調試,也增長了排查問題的難度。爲了不在程序中死鎖的問題,在程序中咱們應該儘可能避免同時獲取多個共享變量的鎖,若是沒法避免,那麼一是要儘可能使用相同的順序來獲取多個共享變量的鎖,另外也要儘量地細化上鎖的粒度,減小上鎖的時間。

use threads::shared; 

 # in thread 1 
 { 
    lock( $share );        # lock for 3 seconds 
    sleep(3);               # other threads can not lock again 
 } 
 # unlock implicitly now after the block 

 # in thread 2 
 { 
    lock($share);          # will be blocked, as already locked by thread 1 
    $share++;               # after thread 1 quit from the block 
 } 
 # unlock implicitly now after the block



use threads; 
 use threads::shared; 

 { 
    lock(@share);          # the array has been locked 
    lock(%hash);           # the hash has been locked 
    sleep(3);               # other threads can not lock again 
 } 

 { 
    lock($share[1]);     # error will occur 
    lock($hash{key});    # error will occur 
 }



信號量:Thread::Semaphore

my $semaphore = Thread::Semaphore->new( $max_threads ); #信號量
my $mutex = Thread::Semaphore->new( 1 );   #互斥量

區別:

1. 互斥量用於線程的互斥,信號量用於線程的同步。

這是互斥量和信號量的根本區別,也就是互斥和同步之間的區別。

互斥:是指某一資源同時只容許一個訪問者對其進行訪問,具備惟一性和排它性。但互斥沒法限制訪問者對資源的訪問順序,即訪問是無序的。

同步:是指在互斥的基礎上(大多數狀況),經過其它機制實現訪問者對資源的有序訪問。在大多數狀況下,同步已經實現了互斥,特別是全部寫入資源的狀況一定是互斥的。少數狀況是指能夠容許多個訪問者同時訪問資源

以上區別是主要想記住的。

note:信號量能夠用來實現互斥量的功能

2. 互斥量值只能爲0/1,信號量值能夠爲非負整數。

也就是說,一個互斥量只能用於一個資源的互斥訪問,它不能實現多個資源的多線程互斥問題。信號量能夠實現多個同類資源的多線程互斥和同步。當信號量爲單值信號量是,也能夠完成一個資源的互斥訪問。

3. 互斥量的加鎖和解鎖必須由同一線程分別對應使用,信號量能夠由一個線程釋放,另外一個線程獲得。

use threads; 
use threads::shared; 
use Thread::Semaphore; 

my $s = Thread::Semaphore->new(); 
$s->down();                # P operation 
... 
$s->up();                  # V operation



Thread::Queue:生產者-消費者模型對多線程隊列的使用。

生產者能夠不斷地在線程隊列上作 enqueue 操做,而消費者只須要不斷地在線程隊列上作 dequeue 操做,這就很簡單地實現了生產者和消費者之間同步的問題。

#!/usr/bin/perl 
 # 

 use threads; 
 use Thread::Queue; 

 my $q = Thread::Queue->new(); 

 sub produce { 
    my $name = shift; 
    while(1) { 
        my $r = int(rand(100)); 
        $q->enqueue($r); 
        printf("$name produce $r\n"); 
        sleep(int(rand(3))); 
    } 
 } 

 sub consume { 
    my $name = shift; 
    while(my $r = $q->dequeue()) { 
        printf("consume $r\n"); 
    } 
 } 

 my $producer1 = threads->create(\&produce, "producer1"); 
 my $producer2 = threads->create(\&produce, "producer2"); 
 my $consumer1 = threads->create(\&consume, "consumer2"); 

 $producer1->join(); 
 $producer2->join(); 
 $consumer1->join();
相關文章
相關標籤/搜索