pthreads v3下的worker和pool的使用

有些人會想,明明用thread已經能夠很好的工做了,爲何還要搞個worker和pool?php

之因此要用到worker和pool仍是由於效率,由於系統建立一個新線程代價是比較昂貴,每一個建立的線程會複製當前執行的整個上下文。mysql

儘量的重用線程可讓咱們的程序更高效。sql

一個簡單的worker例子:數據庫

<?php

//建立自定義work類,給work取個名字,方便查看
class Work extends Worker
{
    private $name;

    public function __construct($name)
    {
        $this->name = $name;
    }

    public function getName()
    {
        return $this->name;
    }
}

class Task extends Thread
{
    private $num;

    public function __construct($num)
    {
        $this->num = $num;
    }

    public function run()
    {
        //計算累加和
        $total = 0;
        for ($i = 0; $i < $this->num; $i++) {
            $total += $i;
        }
        echo "work : {$this->worker->getName()} task : {$total} \n";
        sleep(1);
    }
}

//建立一個worker線程
$work = new Work('a');

$work->start();

for ($i = 1; $i <= 10; $i++) {
    //將Task對象壓棧到worker線程中
    //這個時候Task對象就可使用worker線程上下文(變量,函數等)
    $work->stack(new Task($i));
}

//循環的清理任務,會阻塞主線程,直到棧中任務都執行完畢
while ($work->collect()) ;

//關閉worker
$work->shutdown();

上面代碼在運行的時候,計算結果會每隔一秒出來一條,也就是10個task對象是運行在1個worker線程上的。函數

若是10個task對象是分別在獨立空間運行的,sleep()函數就不會起做用,他們各自sleep並不會影響其餘線程。fetch

把上面的代碼修改一下:this

<?php

//建立自定義work類,給work取個名字,方便查看
class Work extends Worker
{
    private $name;

    public function __construct($name)
    {
        $this->name = $name;
    }

    public function getName()
    {
        return $this->name;
    }
}

class Task extends Thread
{
    private $num;

    public function __construct($num)
    {
        $this->num = $num;
    }

    public function run()
    {
        //計算累加和
        $total = 0;
        for ($i = 0; $i < $this->num; $i++) {
            $total += $i;
        }
        echo "work : {$this->worker->getName()} task : {$total} \n";
        sleep(1);
    }
}

//建立二個worker線程
$work1 = new Work('a');
$work2 = new Work('b');

$work1->start();
$work2->start();

for ($i = 1; $i <= 10; $i++) {
    if ($i <= 5) {
        $work1->stack(new Task($i));
    } else {
        $work2->stack(new Task($i));
    }
}

//循環的清理任務,會阻塞主線程,直到棧中任務都執行完畢
while ($work1->collect() || $work2->collect()) ;

//關閉worker
$work1->shutdown();
$work2->shutdown();

這裏咱們建立2個worker線程,讓10個task對象分別壓棧到2個worker中。線程

這時能夠看到,計算結果是一對一對的出來,說明10個task對象跑在了2個worker線程上。對象

至於須要建立多少個worker線程,和多少個task對象,就看自已的需求了。blog

worker還有一個好處就是能夠重用worker中的對象和方法。咱們能夠在worker中建立一個鏈接數據庫對象,方便各task調用。

<?php

class DB extends Worker
{
    //注意這裏設置爲靜態成員,pdo鏈接自己是不能在上下文中共享的
    //聲明爲靜態成員,讓每一個worker有自已的pdo鏈接
    private static $db = null;
    public $msg = 'i from db';

    public function run()
    {
        self::$db = new PDO('mysql:host=192.168.33.226;port=3306;dbname=test;charset=utf8', 'root', '');
    }

    public function getDb()
    {
        return self::$db;
    }
}

class Task extends Thread
{
    private $id;
    //注意,這裏不要給成員設置默認值,$result成員是線程對象是不可變的,不能被改寫
    private $result;

    public function __construct($id)
    {
        $this->id = $id;
    }

    public function run()
    {
        //獲取worker中的數據庫鏈接
        $db = $this->worker->getDb();
        $ret = $db->query("select * from tb_user where id = {$this->id}");
        $this->result = $ret->fetch(PDO::FETCH_ASSOC);
        //訪問worker中的成員變量msg
        echo "data : {$this->result['id']} {$this->result['name']} \t worker data : {$this->worker->msg} \n";
    }
}

//建立一個worker線程
$work = new DB();

$work->start();

for ($i = 1; $i <= 5; $i++) {
    $work->stack(new Task($i));
}

//循環的清理任務,會阻塞主線程,直到棧中任務都執行完畢
while ($work->collect()) ;

//關閉worker
$work->shutdown();

tb_user表你們能夠隨意建立,我這裏爲了演示只建立了id和name字段

運行結果以下:

 

若是說worker是對線程的重用,那麼pool就是對worker更高的抽象了,能夠同時管理多個worker。

<?php

//之因此要建立一個Id線程類,主要是爲了給work取個不一樣的ID,方便查看,哪些task線程屬於哪一個work中
class Id extends Thread
{
    private $id;

    public function getId()
    {
        //防止出現id混亂,這裏使用同步操做
        $this->synchronized(function () {
            ++$this->id;
        });
        return $this->id;
    }
}

class Work extends Worker
{
    private $id;

    public function __construct(Id $obj)
    {
        $this->id = $obj->getId();
    }

    public function getId()
    {
        return $this->id;
    }
}

class Task extends Thread
{
    private $num = 0;

    public function __construct($num)
    {
        $this->num = $num;
    }

    //計算累加和
    public function run()
    {
        $total = 0;
        for ($i = 0; $i < $this->num; $i++) {
            $total += $i;
        }
        echo "work id : {$this->worker->getId()} task : {$total} \n";
    }
}

//建立pool,可容納3個work對象
$pool = new Pool(3, 'Work', [new Id()]);

//循環的把20個task線程提交到pool中的work對象上運行
for ($i = 1; $i <= 20; $i++) {
    $pool->submit(new Task($i));
}

//循環的清理任務,會阻塞主線程,直到任務都執行完畢
while ($pool->collect()) ;

//關閉pool
$pool->shutdown();

運行結果以下:

相關文章
相關標籤/搜索