beanstalk隊列應用

隊列用處

  • 異步處理:用戶註冊後查找數據庫發送郵件推薦商品,把推薦商品放入隊列。
  • 系統解耦:上面的過程將用戶註冊和推薦商品分離開來了。
  • 定時任務: 把考生的試卷提交信息提早放入隊列,天天固定某個時間實現自動批改選擇題。

beanstalkd工具

beanstalked是一個簡單的快速的通用的輕量級內存隊列,能夠實現百萬級任務處理。php

特性

  • 優先級:能夠設置任務的優先級
  • 延遲: 設置任務多少秒後才容許被消費者使用
  • 持久化:定時刷新數據到文件,服務器掛掉後數據依舊存在
  • 超時會重發:消費者必須在指定時間內完成任務,不然就會從新放入管道
  • 任務預留:消費者先暫時跳過任務不處理

任務和管道git

任務的狀態轉換github

ready:任務已經準備好了,能夠給消費者使用。
delay:任務放入管道的時候設置了延遲時間。
reserve:消費者把任務讀取出來
buried:任務先放在一邊,之後還會用
delete:任務從隊列刪除數據庫

安裝

地址:https://github.com/kr/beanstalkd服務器

能夠經過源代碼安裝或者包管理工具(brew,apt-get)安裝,安裝完成後輸入beanstalkd會由相應的命令.異步

image.png工具

首先使用下面的命令綁定本地地址和端口號ui

beanstalkd -l 127.0.0.1 -p 11300 &

beanstalkd的類管理

可使用pheantalkphp類操做beanstalkdpheantalk是對beanstalkd命令的封裝。能夠經過該類實現對任務的操做。spa

https://github.com/pda/pheanstalk命令行

require 'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
//查看beanstalk的當前信息命令行的封裝
print_r($pheanstalk->stats());

beanstalkd當前狀態.png

好比查看某個任務的信息

print_r($pheanstalk->putInTube('mytube','job',1000));
$job = $pheanstalk->watch('mytube')->reserve();
print_r( $pheanstalk->statsJob($job));

job的信息.png

還有其餘的操做以下

require 'vendor/autoload.php';

use Pheanstalk\Pheanstalk;

//如今命令行輸入  beanstalkd -l 127.0.0.1 -p 11300 & 開啓

$pheanstalk = new Pheanstalk('127.0.0.1',11300);

//查看beanstalk的當前信息

// 能夠經過命令行對管道進行管理,本php類對管道的管理就是對beanstalkd命令行的封裝

print_r($pheanstalk->stats());

// 當前的管道
print_r($pheanstalk->listTubes());

// 查看默認的管道
print_r($pheanstalk->statsTube('default'));

//在mytube 裏面放入任務 one,使用默認的優先級,延遲重發時間,超時重發時間,
$pheanstalk->useTube('mytube')->put('one');


//從管道取出任務

$job = $pheanstalk->watch('mytube')->reserve();
//根據job的編號獲取信息
$job = $pheanstalk->peek(1);

$pheanstalk->delete($job);//刪除任務


$pheanstalk->watch('mytube')->delete($job);
//會把任務從新放入管道,並把任務設置爲ready狀態
$pheanstalk->release($job);

//把任務放在一邊暫時不處理,條件成熟了在讀取出來
$pheanstalk->bury($job);

$pheanstalk->peekBuried('mytube');// 獲取處理bury狀態的任務

$pheanstalk->kickJob($job);//會把job轉變成ready狀態

$pheanstalk->kick(200);//把job編號小於200的變成ready狀態

//--- 獲取某個狀態的任務
$pheanstalk->peekDelayed();
$pheanstalk->peekReady();
$pheanstalk->peekBuried();

$pheanstalk->pauseTube('mytube',10);//阻塞整個管道時間

生成者和消費者

生產者將任務放入隊列

require  'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk("127.0.0.1",11300);
$pheanstalk->useTube('mytube')->put('one',1024,10,3);
$pheanstalk->useTube('mytube')->put('two',1023);
$pheanstalk->useTube('mytube')->put('three',1025);
print_r($pheanstalk->stats());

消費者處理任務

require  'vendor/autoload.php';
use Pheanstalk\Pheanstalk;
//終端等待表示沒有生產者產生數據
$pheanstalk = new Pheanstalk("127.0.0.1",11300);
//reserve能夠設置阻塞時間,不設置會一直等待,watch 能夠同時監聽兩個管道,
$job = $pheanstalk->watch('mytube')->watch('default')->reserve(); // reserver是獲取ready的job
$pheanstalk->ignore('default');//忽略管道
//處理代碼
if ($job->getData() == 'one') {
    sleep(2);// 超時重回隊列3秒,睡眠兩秒,delete以前的代碼還能執行1秒,若是想在加
    $pheanstalk->watch('mytube')->touch($job);//續命
}

具體對隊列和任務的操做能夠結合實際邏輯和設置任務和管道的狀態。

做者:鴻雁長飛光不度 連接:https://www.jianshu.com/p/82c4c6fee450 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

相關文章
相關標籤/搜索