Perl線程隊列:Thread::Queue

(Thread::Queue)隊列數據結構(FIFO)是線程安全的,它保證了某些線程從一端寫入數據,另外一些線程從另外一端讀取數據。只要隊列已經滿了,寫入操做就自動被阻塞直到有空間支持寫操做,只要隊列空了,讀取操做就會自動阻塞直到隊列中有數據可讀。這種模式自身就保證了線程安全性。安全

建立隊列

new()
new(LIST)
new()能夠建立一個空隊列,或者根據已有的列表建立隊列,列表中的元素會按照前後順序放進這個隊列中。數據結構

哪些元素可放進隊列

能夠被threads::shared共享的數據均可以放進隊列。包括:線程

Ordinary scalars
Array refs
Hash refs
Scalar refs
Objects based on the above

放進隊列的數據必須是已經共享的,若是沒有共享,則會自動克隆(遞歸克隆)一份後將其共享並將共享後放進隊列。scala

例如,下面首先會建立一個空隊列,因爲@arr未共享,因此會先經過&shared([])將一個空列表(匿名列表引用)共享,並將@arr中的3個元素放進去,而後再將這個共享的匿名列表放進隊列。code

my @arr = qw(foo bar baz);
$q->enqueue(\@arr);

可是下面的數據是已經共享的,將會直接放進隊列中,而不須要先克隆一份。遞歸

my @arr :shared = qw(foo bar baz);
$q->enqueue(\@arr);

注意是整個列表引用,它將是隊列中的單個元素,而不是將那三個元素放進隊列。另外,須要克隆的時候,原始數據結構不會被共享,而是共享它的匿名結構的克隆。索引

操做隊列的基本方法

enqueue(LIST)
將列表中的元素放進隊列的尾部,給定的列表能夠只有一個元素。默認狀況下隊列長度能夠無限增加,但能夠經過limit來設置隊列

dequeue()
dequeue(COUNT)
從隊列的頭部移除COUNT(默認爲1)個元素,並返回它們。若是隊列中的元素數量少於請求移除的數量,則線程將被阻塞直到有足夠的元素可返回it

dequeue_nb()
dequeue_nb(COUNT)
以非阻塞的方式請求從隊列頭部移除COUNT(默認爲1)個元素並返回它們,若是隊列中元素個數少於請求移除的數量,則當即返回已存在的那些元素而不會阻塞等待,若是隊列爲空,則當即返回undefthread

dequque_timed(TIMEOUT)
dequque_timed(TIMEOUT, COUNT)
從隊列的頭部移除COUNT(默認爲1)個元素,並返回它們。若是隊列中的元素個數少於請求移除的個數,則阻塞直到有足夠的元素能夠返回或者阻塞直到超時。若是等待到了超時,則返回隊列中已存在的元素,若是隊列爲空,則返回undef。若是省略了TIMEOUT或者定義爲undef或小於等於0的值,則等價於dequeue_nb

timeout能夠是以秒爲單位計時的正數、小數(如四、2.5等),也能夠是從1970-01-01 00:00:00距離如今已過去的秒數,即epoch時間戳。

pending()
返回隊列中還有多少個元素,若是隊列已被結束(end())了或已經沒有元素了,則返回undef。

$QUEUE->limit = N
設置隊列的最大長度爲N,若是N=0則表示無限制。設置了limit後,後續的enqueue會被阻塞直到pending的數量小於limit的值,可是須要注意的是,在一次性向隊列中enqueue多個元素致使跨了limit值時不會阻塞。見示例:

my $q = Thread::Queue->new(1, 2);
$q->limit = 4;
$q->enquque(3,4,5);  # 不會阻塞
$q->enqueue(6);      # 阻塞,直到隊列中元素少於4

my $size = $q->limit;  # 返回當前的隊列限制值,可能會返回undef
$q->limit = 0;     # 再也不限制隊列長度

end()
聲明這個隊列已經不會再有元素放進來了,也就是告訴對端要關閉隊列,使其不要等待或阻塞。這也能夠經過enqueue(undef)發送undef的方式來實現。

更多操做隊列的方法

peek()
peek(INDEX)
從隊列中返回指定索引位(注意:從隊列頭部開始計數)的元素但卻不移除它,若是不提供INDEX則默認返回隊列頭部的元素(index=0)。INDEX能夠是負數,-1表示隊列的最尾部元素,-2表示尾部倒數第二個元素。

若是指定索引處元素不存在(即索引越界)或者隊列爲空,則返回undef。

記住,peek不會從隊列中將數據移除掉,因此操做一個peek出來的數據可能會影響到隊列中的對應元素。見後文示例。

insert(INDEX, LIST)
將list中的元素插入到INDEX(從隊列頭部計數)位置處。負數索引表示從隊列尾部開始計數。若是指定的INDEX大於當前隊列最大索引值,則直接插入到隊列的尾部(正數INDE)或隊列的頭部(負數INDEX)。例如:

$q->enqueue(1,2,3,4);
$q->insert(1, qw(foo bar));  # 1, foo, bar, 2, 3, 4

$q->enqueu(1, 2, 3, 4);
$q->insert(-2, qw(foo bar));  # 1, 2, foo, bar, 3, 4

extract()
extract(INDEX)
extract(INDEX, COUNT)
以非阻塞的方式移除並返回指定索引位置處(從隊列頭部開始計數)指定個數的元素(默認爲1個)。若是不給任何參數,則等價於dequeue_nb。若是元素個數不足,則能返回多少算多少。若是索引位置越界,則返回undef或空列表。

例如:

$q->enqueue(1,2,3,4);
my $item = $q->extract(2);  # 3,隊列包含:1 2 4
my @items = $q->extract(1, 3); # 返回(2, 4),隊列包含:1

因爲peek方法能夠直接從隊列中獲取元素但卻不將其從隊列中移除,這可能會致使隊列中的引用類型元素在隊列外被修改。例如,下面的代碼:

#!/usr/bin/perl
use strict;
use warnings;

use threads;
use threads::shared;
use Thread::Queue;

my $q = Thread::Queue->new();

# 放非引用數據到隊列
$q->enqueue(1, 2, 3)

# 放引用數據到隊列
$q->enqueue(['a', 'b', 'c'])

my $thr = threads->new(
    sub {
        sleep 1;
        while(my $item = $q->dequeue()){
            # 若是是引用,要當心數據被其它線程修改
            if (ref $item){
                foreach (@$item){
                    print "ele: $_\n";
                }
            } else {
                print "item: $item\n";
            }
        }
    }
);

my $num = $q->peek();
$num = 11;     # 不影響,由於是非引用數據

my $list = $q->peek(3);
$$list[1] = 'bb';   # 將影響隊列中對應元素

$q->end();   # 關閉隊列
$thr->join;

爲了防止隊列中的元素值被其它線程修改,可使用threads::shared中的lock()將隊列鎖在一個代碼塊內。

{
    lock($q);
    my $item = $q->peek();
    if($item...){
        ...
    }
}  # 釋放鎖
相關文章
相關標籤/搜索