beanstalkd消息隊列使用

最近在作一個項目,須要用戶在提交相關信息後,分析信息內容,而後將分析結果推送到相關的用戶的信息模塊中,用到了beanstalk這個隊列系統。  php

beanstalkd介紹: html

Beanstalkd,一個高性能、輕量級的分佈式內存隊列系統,最初設計的目的是想經過後臺異步執行耗時的任務來下降高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。後來開源,如今有PostRank大規模部署和使用,天天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是一樣的風格,因此使用過memcached的用戶會以爲Beanstalkd似曾相識。 git


Beanstalkd設計裏面的核心概念:
◆ job
一個須要異步處理的任務,是Beanstalkd中的基本單元,須要放在一個tube中。
◆ tube
一個有名的任務隊列,用來存儲統一類型的job,是producer和consumer操做的對象。
◆ producer
Job的生產者,經過put命令來將一個job放到一個tube中。
◆ consumer
Job的消費者,經過reserve/release/bury/delete命令來獲取job或改變job的狀態。

Beanstalkd中一個job的生命週期如圖所示。一個job有READY, RESERVED, DELAYED, BURIED四種狀態。當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,若是選擇延遲put,job就先到DELAYED狀態,等待時間事後才遷移到READY狀態。consumer獲取了當前READY的job後,該job的狀態就遷移到RESERVED,這樣其餘的consumer就不能再操做該job。當consumer完成該job後,能夠選擇delete, release或者bury操做;delete以後,job從系統消亡,以後不能再獲取;release操做能夠從新把該job狀態遷移回READY(也能夠延遲該狀態遷移操做),使其餘的consumer能夠繼續獲取和執行該job;有意思的是bury操做,能夠把該job休眠,等到須要的時候,再將休眠的job kick回READY狀態,也能夠delete BURIED狀態的job。正是有這些有趣的操做和狀態,才能夠基於此作出不少意思的應用,好比要實現一個循環隊列,就能夠將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。 github

特性: shell

◆ 優先級
支持0到2**32的優先級,值越小,優先級越高,默認優先級爲1024。
◆ 持久化
能夠經過binlog將job及其狀態記錄到文件裏面,在Beanstalkd下次啓動時能夠經過讀取binlog來恢復以前的job及狀態。
◆ 分佈式容錯
分佈式設計和Memcached相似,beanstalkd各個server之間並不知道彼此的存在,都是經過client來實現分佈式以及根據tube名稱去特定server獲取job。
◆ 超時控制

爲了防止某個consumer長時間佔用任務但不能處理的狀況,Beanstalkd爲reserve操做設置了timeout時間,若是該consumer不能在指定時間內完成job,job將被遷移回READY狀態,供其餘consumer執行。 ubuntu

 下載: centos

服務端:http://kr.github.io/beanstalkd/download.html 異步

客戶端:https://github.com/kr/beanstalkd/wiki/client-libraries 分佈式

安裝: memcached

ubuntu 

sudo apt-get install beanstalkd

centos 

yum install beanstalkd

源碼安裝 

tar -zxvf /usr/bin/beanstalkd/beanstalkd-1.9.tar.gz

cd beanstalkd

make install PERFIX=/usr/bin/beanstalkd


後臺啓動:

beanstalkd -l 地址 -p 端口號 -z 最大的任務大小(byte) -c &

若是是外部客戶端鏈接,ip地址要寫外網地址,這樣才能鏈接上

啓動選項

 -b DIR   wal directory

 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help


php客戶端的使用:我使用的是這個簡易的類 https://github.com/davidpersson/beanstalk

發送任務:

<?php
//發送任務

require_once 'src/Socket/Beanstalk.php';

//實例化beanstalk
$beanstalk = new Socket_Beanstalk(array(
    'persistent' => false, //是否長鏈接
    'host' => 'ip地址',
    'port' => 11600,  //端口號默認11300
    'timeout' => 3    //鏈接超時時間
));

if (!$beanstalk->connect()) {
    exit(current($beanstalk->errors()));
}
//選擇使用的tube
$beanstalk->useTube('test');
//往tube中增長數據
$put = $beanstalk->put(
    23, // 任務的優先級.
    0,  // 不等待直接放到ready隊列中.
    60, // 處理任務的時間.
    'hello, beanstalk' // 任務內容
);

if (!$put) {
    exit('commit job fail');
}

$beanstalk->disconnect();

 處理任務:


<?php
require_once 'src/Socket/Beanstalk.php';
//實例化beanstalk
$beanstalk = new Socket_Beanstalk(array(
    'persistent' => false, //是否長鏈接
    'host' => 'ip地址',
    'port' => 11600,  //端口號默認11300
    'timeout' => 3    //鏈接超時時間
));

if (!$beanstalk->connect()) {
    exit(current($beanstalk->errors()));
}
//查看beanstalkd狀態
//var_dump($beanstalk->stats());

//查看有多少個tube
//var_dump($beanstalk->listTubes());

$beanstalk->useTube('test');

//設置要監聽的tube
$beanstalk->watch('test');

//取消對默認tube的監聽,能夠省略
$beanstalk->ignore('default');

//查看監聽的tube列表
//var_dump($beanstalk->listTubesWatched());

//查看test的tube當前的狀態
//var_dump($beanstalk->statsTube('test'));


while (true) {
    //獲取任務,此爲阻塞獲取,直到獲取有用的任務爲止
    $job = $beanstalk->reserve(); //返回格式array('id' => 123, 'body' => 'hello, beanstalk')

    //處理任務
    $result = doJob($job['body']);

    if ($result) {
        //刪除任務
        $beanstalk->delete($job['id']);
    } else {
        //休眠任務
        $beanstalk->bury($job['id']);
    }
    //跳出無限循環
    if (file_exists('shutdown')) {
        file_put_contents('shutdown', 'beanstalkd在'.date('Y-m-d H:i:s').'關閉');
        break;
    }
}
$beanstalk->disconnect();
相關文章
相關標籤/搜索