最近在作一個項目,須要用戶在提交相關信息後,分析信息內容,而後將分析結果推送到相關的用戶的信息模塊中,用到了beanstalk這個隊列系統。 php
beanstalkd介紹: html
Beanstalkd,一個高性能、輕量級的分佈式內存隊列系統,最初設計的目的是想經過後臺異步執行耗時的任務來下降高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。後來開源,如今有PostRank大規模部署和使用,天天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是一樣的風格,因此使用過memcached的用戶會以爲Beanstalkd似曾相識。 git
Beanstalkd中一個job的生命週期如圖所示。一個job有READY, RESERVED, DELAYED, BURIED四種狀態。當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,若是選擇延遲put,job就先到DELAYED狀態,等待時間事後才遷移到READY狀態。consumer獲取了當前READY的job後,該job的狀態就遷移到RESERVED,這樣其餘的consumer就不能再操做該job。當consumer完成該job後,能夠選擇delete, release或者bury操做;delete以後,job從系統消亡,以後不能再獲取;release操做能夠從新把該job狀態遷移回READY(也能夠延遲該狀態遷移操做),使其餘的consumer能夠繼續獲取和執行該job;有意思的是bury操做,能夠把該job休眠,等到須要的時候,再將休眠的job kick回READY狀態,也能夠delete BURIED狀態的job。正是有這些有趣的操做和狀態,才能夠基於此作出不少意思的應用,好比要實現一個循環隊列,就能夠將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。 github
特性: shell
爲了防止某個consumer長時間佔用任務但不能處理的狀況,Beanstalkd爲reserve操做設置了timeout時間,若是該consumer不能在指定時間內完成job,job將被遷移回READY狀態,供其餘consumer執行。 ubuntu
下載: centos
服務端:http://kr.github.io/beanstalkd/download.html 異步
客戶端:https://github.com/kr/beanstalkd/wiki/client-libraries 分佈式
安裝: memcached
ubuntu
sudo apt-get install beanstalkd
centos
yum install beanstalkd
源碼安裝
tar -zxvf /usr/bin/beanstalkd/beanstalkd-1.9.tar.gz cd beanstalkd make install PERFIX=/usr/bin/beanstalkd
後臺啓動:
beanstalkd -l 地址 -p 端口號 -z 最大的任務大小(byte) -c &
若是是外部客戶端鏈接,ip地址要寫外網地址,這樣才能鏈接上
啓動選項
-b DIR wal directory
-f MS fsync at most once every MS milliseconds (use -f0 for "always fsync")
-F never fsync (default)
-l ADDR listen on address (default is 0.0.0.0)
-p PORT listen on port (default is 11300)
-u USER become user and group
-z BYTES set the maximum job size in bytes (default is 65535)
-s BYTES set the size of each wal file (default is 10485760)
(will be rounded up to a multiple of 512 bytes)
-c compact the binlog (default)
-n do not compact the binlog
-v show version information
-V increase verbosity
-h show this help
php客戶端的使用:我使用的是這個簡易的類 https://github.com/davidpersson/beanstalk
發送任務:
<?php //發送任務 require_once 'src/Socket/Beanstalk.php'; //實例化beanstalk $beanstalk = new Socket_Beanstalk(array( 'persistent' => false, //是否長鏈接 'host' => 'ip地址', 'port' => 11600, //端口號默認11300 'timeout' => 3 //鏈接超時時間 )); if (!$beanstalk->connect()) { exit(current($beanstalk->errors())); } //選擇使用的tube $beanstalk->useTube('test'); //往tube中增長數據 $put = $beanstalk->put( 23, // 任務的優先級. 0, // 不等待直接放到ready隊列中. 60, // 處理任務的時間. 'hello, beanstalk' // 任務內容 ); if (!$put) { exit('commit job fail'); } $beanstalk->disconnect();
處理任務:
<?php require_once 'src/Socket/Beanstalk.php'; //實例化beanstalk $beanstalk = new Socket_Beanstalk(array( 'persistent' => false, //是否長鏈接 'host' => 'ip地址', 'port' => 11600, //端口號默認11300 'timeout' => 3 //鏈接超時時間 )); if (!$beanstalk->connect()) { exit(current($beanstalk->errors())); } //查看beanstalkd狀態 //var_dump($beanstalk->stats()); //查看有多少個tube //var_dump($beanstalk->listTubes()); $beanstalk->useTube('test'); //設置要監聽的tube $beanstalk->watch('test'); //取消對默認tube的監聽,能夠省略 $beanstalk->ignore('default'); //查看監聽的tube列表 //var_dump($beanstalk->listTubesWatched()); //查看test的tube當前的狀態 //var_dump($beanstalk->statsTube('test')); while (true) { //獲取任務,此爲阻塞獲取,直到獲取有用的任務爲止 $job = $beanstalk->reserve(); //返回格式array('id' => 123, 'body' => 'hello, beanstalk') //處理任務 $result = doJob($job['body']); if ($result) { //刪除任務 $beanstalk->delete($job['id']); } else { //休眠任務 $beanstalk->bury($job['id']); } //跳出無限循環 if (file_exists('shutdown')) { file_put_contents('shutdown', 'beanstalkd在'.date('Y-m-d H:i:s').'關閉'); break; } } $beanstalk->disconnect();