Beanstalkd消息隊列 -- php類Pheanstalk使用

業務場景

  商城訂單生成30分鐘後 若是未支付關閉訂單php

 

解決辦法

可使用延遲消息隊列   這裏咱們用的是beanstalkdgit

 

Beanstalkd介紹

Beanstalk,一個高性能、輕量級的分佈式內存隊列系統,最初設計的目的是想經過後臺異步執行耗時的任務來下降高容量Web應用系統的頁面訪問延遲。github

 

Beanstalkd特性

一、支持優先級(支持任務插隊)
二、延遲(實現定時任務)
三、持久化(定時把內存中的數據刷到binlog日誌)
四、預留(把任務設置成預留,消費者沒法取出任務,等某個合適時機再拿出來處理)
五、任務超時重發(消費者必須在指定時間內處理任務,若是沒有則認爲任務失敗,從新進入隊列)shell

 

任務job狀態

delayed 延遲狀態
ready 準備好狀態
reserved 消費者把任務讀出來,處理時
buried 預留狀態
delete 刪除狀態json



Beanstalkd安裝

yum -y install beanstalkd

  

Beanstalkd守護進程啓動

nohup beanstalkd -l 0.0.0.0 -p 11300 -b /home/software/binstalkd/binlogs &

1. -b表示開啓binlog,斷電後重啓自動恢復任務composer

2.  /home/software/binstalkd/binlogs是咱們已經建立好的一個文件目錄 這裏面主要放一些持久化的日誌 異步

 

Pheanstalk安裝

      composer require pda/pheanstalk分佈式

PheanstalkGit倉庫地址

    https://github.com/pda/pheanstalk

 

鏈接Beanstalkd

 
<?php
require __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

/**
 * 實例化beanstalk
 * 參數依次爲:ip地址 端口號默認11300 鏈接超時時間 是否長鏈接
 */
$pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
?>

 

producer.php 生產者

<?php

// Hopefully you're using Composer autoloading.
require_once __DIR__.'/vendor/autoload.php';
use Pheanstalk\Pheanstalk;

/**
 * 實例化beanstalk
 * 參數依次爲:ip地址 端口號默認11300 鏈接超時時間 是否長鏈接
 */
$pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
//向tubeName管道中添加任務,返回任務ID
//put()方法有四個參數
//第一個任務的數據  
//第二個任務的優先級,值越小,越先處理 (默認爲0  no delay)
//第三個任務的延遲 單位:秒 (most urgent: 0, least urgent: 4294967295)
//第四個任務的ttr超時時間 單位:秒
$arr['name'] = 'mark';
$jobData = json_encode($arr);

$pheanstalk ->useTube('tubeName') ->put($jobData,1024,5);

  

Beanstalkd生產者方法

指定須要使用的管道性能

$tube = $pheanstalk->useTube('default');

向管道插入數據ui

$tube = $pheanstalk->useTube('default'); $put = $tube->put( 'hello, beanstalk', // 任務內容 1024, // 任務的優先級 10, // 不等待直接放到ready隊列中 60 // 處理任務的時間 );

或者:

$pheanstalk->putInTube('default', 'test1', 1024, 10, 60);

---------------------------------------------------------------------------------------------------------------------+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

consumer.php 消費者

<?php

// Hopefully you're using Composer autoloading.
require_once __DIR__.'/vendor/autoload.php';
use Pheanstalk\Pheanstalk;

/**
 * 實例化beanstalk
 * 參數依次爲:ip地址 端口號默認11300 鏈接超時時間 是否長鏈接
 */
$pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);

//監聽tubeName管道,忽略default管道
$job = $pheanstalk ->watch('tubeName') ->ignore('default') ->reserve();

echo $job->getData();

//刪除當前任務 $pheanstalk->delete($job);

  

Beanstalkd消費者方法

監聽管道

$tube = $pheanstalk->watch('user');

去除不須要監聽的管道

$tube = $pheanstalk->watch('user')->ignore('default');

以堵塞的方式監聽管道

$job = $pheanstalk->watch('user')->reserve(4); //堵塞時間爲4秒

列出全部已經監聽的管道

$pheanstalk->listTubesWatched();

watch + reserve 方法

$pheanstalk->reserveFromTube('default')

刪除當前任務

$job =  $pheanstalk->watch('default')->reserve(); $pheanstalk->delete($job);

將當前任務從新放入管道

$job =  $pheanstalk->watch('default')->reserve(); $pheanstalk->release($job);

爲任務續命(當處理任務的時間小於當前任務執行時間時)

$job =  $pheanstalk->watch('default')->reserve(); $pheanstalk->touch($job); //TODO

將任務預留

$job =  $pheanstalk->watch('default')->reserve(); $pheanstalk->bury($job);

將預留任務釋放(變爲reday狀態)

$job = $pheanstalk->peekBuried('default'); $pheanstalk->kickJob($job);

批量將預留任務釋放

$pheanstalk->userTube('default')->kick(999); //將id小於999的預留任務所有釋放

讀取當前準備就緒的任務(ready)

$job = $pheanstalk->peekReady('default');

讀取當前處於延遲狀態的任務(delayed)

$job = $pheanstalk->peekDelayed('default');

對管道設置延遲

$pheanstalk->pauseTube('default', 100); //設置100秒延遲

取消對管道的延遲

$pheanstalk->resumeTube('default');
相關文章
相關標籤/搜索