基於ThinkPHP使用Beanstalk的使用 入門篇(一)

Beanstalk,一個高性能、輕量級的分佈式內存隊列系統,最初設計的目的是想經過後臺異步執行耗時的任務來下降高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。後來開源,如今有PostRank大規模部署和使用,天天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是一樣的風格,因此使用過memcached的用戶會以爲Beanstalkd似曾相識。php

高性能離不開異步,異步離不開隊列,而其內部都是Producer-Consumer模式的原理。服務器

Beanstalkd設計裏面的核心概念:composer

◆ job異步

一個須要異步處理的任務,是Beanstalkd中的基本單元,須要放在一個tube中。分佈式

◆ tubememcached

一個有名的任務隊列,用來存儲統一類型的job,是producer和consumer操做的對象。性能

◆ producer測試

Job的生產者,經過put命令來將一個job放到一個tube中。ui

◆ consumerspa

Job的消費者,經過reserve/release/bury/delete命令來獲取job或改變job的狀態。

Beanstalkd中一個job的生命週期。一個job有READY, RESERVED, DELAYED, BURIED四種狀態。當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,若是選擇延遲put,job就先到DELAYED狀態,等待時間事後才遷移到READY狀態。consumer獲取了當前READY的job後,該job的狀態就遷移到RESERVED,這樣其餘的consumer就不能再操做該job。當consumer完成該job後,能夠選擇delete, release或者bury操做;delete以後,job從系統消亡,以後不能再獲取;release操做能夠從新把該job狀態遷移回READY(也能夠延遲該狀態遷移操做),使其餘的consumer能夠繼續獲取和執行該job;有意思的是bury操做,能夠把該job休眠,等到須要的時候,再將休眠的job kick回READY狀態,也能夠delete BURIED狀態的job。正是有這些有趣的操做和狀態,才能夠基於此作出不少意思的應用,好比要實現一個循環隊列,就能夠將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。

Beanstalkd基於的源碼安裝和使用很簡單,在此略過。這裏重點介紹一下其幾個很nice的特性。

◆ 優先級

支持0到2**32的優先級,值越小,優先級越高,默認優先級爲1024。

◆ 持久化

能夠經過binlog將job及其狀態記錄到文件裏面,在Beanstalkd下次啓動時能夠經過讀取binlog來恢復以前的job及狀態。

◆ 分佈式容錯

分佈式設計和Memcached相似,beanstalkd各個server之間並不知道彼此的存在,都是經過client來實現分佈式以及根據tube名稱去特定server獲取job。

◆ 超時控制

爲了防止某個consumer長時間佔用任務但不能處理的狀況,Beanstalkd爲reserve操做設置了timeout時間,若是該consumer不能在指定時間內完成job,job將被遷移回READY狀態,供其餘consumer執行。

Beanstalkd不足之處:在使用中發現一個Beanstalkd尚無提供刪除一個tube的操做,只能將tube的job依次刪除,並讓Beanstalkd來自動刪除空tube。還有就是Beanstalkd不支持客戶端認證機制(開發者將應用場景定位在局域網)。

 

下面,咱們經過php類Pheanstalk來進行簡單入門。

1. composer安裝

切換到項目目錄,使用composer進行加載安裝:composer require pda/pheanstalk

(如出現報錯,請更新composer在下載:composer update)

2. 鏈接Beanstalk

--- 生產者 ---

關於如何安裝Beanstalk,在這裏就不進行說明了,操做很簡單,請自行在執行系統安裝好,才能使用。

做者是使用 4.0 版本的類庫

鏈接服務

注意的是,在4.0以前的版本,是直接使用 new Pheanstalk($host, $port, ...) 方式鏈接的,4.0版本使用 create 靜態方法進行鏈接。

鏈接成功,咱們來完成一個簡單測試,「像管道demo中,put進值」,如

查看demo管道信息

這樣,生產者就完成任務放進管道的操做。

3. 如何處理管道任務

--- 消費者 ---

咱們建立一個消費者的方法,經過while來處理管道每次產生的任務(這是模擬,實際生產中,下一篇文章會詳細說明),如:

如下的代碼是監聽管道demo,並將任務取出來處理

當咱們在終端執行時,我看到每次處理任務的數據,如:

整個生產者到消費者的流程就是這樣,至於具體業務,須要根據自身的狀況進行處理。

入門篇(一)完!

 

如下是整理了一些經常使用方法,僅供參考(來源於網上,感謝網友的提供)

//----------------------------------------維護類----------------------------------

//1.查看目前pheanStalkd狀態信息
//print_r($ph->stats()); 

//2.顯示目前存在的管道
//print_r($ph->listTubes()); 

//3.查看NewUsers管道的信息
//$ph->useTube('NewUsers')->put('test'); 
//$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道添加一個up任務
//print_r($ph->statsTube('NewUsers'));//3.查看NewUsers管道的信息

//6.查看指定管道中某一個任務的狀況
//$job = $ph->watch('NewUsers')->reserve(); //5.從管道中取出任務(消費)
//print_r($ph->statsJob($job)); //6.查看指定管道中某一個任務的狀況

//7.查看任務id爲1的任務詳情
//$job = $ph->peek(1);7.直接取出任務id爲1的任務 [注:beanstalkd中全部任務的id都具備惟一性] 
//print_r($ph->statsJob($job));//查看任務id爲1的任務詳情


//----------------------------------------生產類--------------------------------------

////第一種 put()

//$tube = $ph->useTube('NewUsers');//鏈接NewUsers管道
//print_r($tube->put('four'));//向NewUsers管道添加任務four,並返回結果
//注: put()方法還有3個可選參數(依次爲: 優先級priority,延遲時間delay,任務超時重發ttr)

////第二種 putInTube() [注: putInTube()就是對useTube()和put()的封裝]
//$res = $ph->putInTube('NewUsers','three');//向NewUsers管道添加任務three
////注: putInTube()方法還有3個可選參數(依次爲: 優先級priority,延遲時間delay,任務超時重發ttr)
//print_r($res);//返回任務id

//print_r($ph->statsTube('NewUsers'));//查看NewUsers管道的詳細狀況


//---------------------------------------消費類--------------------------------------

// 1.watch 監聽NewUsers管道 [ 注: watch()一樣能夠監聽多個管道 ]
//$tube = $ph->watch('NewUsers');
//print_r($ph->listTubesWatched());//打印已經監聽的管道


// 2.watch 監聽多個管道
//$tube = $ph->watch('NewUsers')
//           ->watch('default');
//print_r($ph->listTubesWatched());//打印已經監聽的管道


// 3.ignore 監聽NewUsers管道,忽略default管道
//$tube = $ph->watch('NewUsers')
//            ->ignore('default');
//print_r($ph->listTubesWatched());//打印已經監聽的管道


// 4.reserve 監聽NewUsers管道,而且取出任務
//$job = $ph->watch('NewUsers')
//          ->reserve();
//
////注reserve()有1個參數,阻塞的時間,過了阻塞時間,無論有沒有東西,直接返回
//
//var_dump($job);//打印已經取出的任務
//$ph->delete($job);//刪除已經取出的任務


// 5.putInTube/put 向NewUsers管道寫入任務 [ 注:此爲生產者方法,放到此處是爲了方便理解 ]
//$ph->putInTube('NewUsers','number_1',5);
//$ph->putInTube('NewUsers','number_2',3);
//$ph->putInTube('NewUsers','number_3',0);
//$ph->putInTube('NewUsers','number_4',4);
//print_r($ph->statsTube('NewUsers'));//5.查看NewUsers管道詳細信息


// 6.release 將取出的任務放回ready狀態,還有2個參數(優先級和延遲)
//$job = $ph->watch('NewUsers')->reserve();//6.監聽NewUsers管道,並取出任務

//if (true) {
//    sleep(30);
//    $ph->release($job);//6.將任務取出以後,停留30秒,而後將任務狀態從新變爲ready
//} else {
//    $ph->delete($job);
//}


// 7.bury (預留) 將任務取出以後,發現後面執行的邏輯不成熟(好比發郵件,忽然發現郵件服務器掛掉了),
//或者說還不能執行後面的邏輯,須要把任務先封存起來,等待時機成熟了,再拿出這個任務進行消費

//$job = $ph->watch('NewUsers')->reserve();//取出任務
//$ph->bury($job);//取出任務後,將任務放到一邊(預留)

// 8.peekBuried() 將處在bury狀態的任務讀取出來
//$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來
//var_dump($ph->statsJob($job));//打印任務狀態(此時任務狀態應該是bury)

// 9.kickJob() 將處在bury任務狀態的任務轉化爲ready狀態
//$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來
//$ph->kickJob($job);

// 10.kick()  將處在bury任務狀態的任務轉化爲ready狀態,有第二個參數int, 批量將任務id小於此數值的任務轉化爲ready
//$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任務id小於65,而且任務狀態處於bury的任務所有轉化爲ready

// 11.peekReady() 將管道中處於ready狀態的任務讀出來
//$job = $ph->peekReady('NewUser');//將NewUser管道中處於ready狀態的任務讀取出來
//var_dump($job);
//$ph->delete($job);

// 12.peekDelay() 將管道中全部處於delay狀態的任務讀取出來
//$job = $ph->peekDelayed('NewUser');
//var_dump($job);
//$ph->delete($job);


// 13.pauseTube() 對整個管道進行延遲設置,讓管道處於延遲狀態
//$ph->pauseTube('NewUser',10);//設置管道NewUser延遲時間爲10s
//$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,並取出任務
//var_dump($job);

// 14.resumeTube() 恢復管道,讓管道處於不延遲狀態,當即被消費
//$ph->resumeTube('NewUser');//取消管道NewUser的延遲狀態,變爲當即讀取
//$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,並取出任務
//var_dump($job);

// 15.touch() 讓任務從新計算任務超時重發ttr時間,至關於給任務延長壽命

相關文章
相關標籤/搜索