beanstalkd消息隊列在生產環境的應用

        Beanstalkd 是一個高性能的消息隊列中間件,本博文宅鳥將介紹一下這個東東的使用。php

1、先經過概念讓你們瞭解Beanstalkd的特性和工做場景。html

        Beanstalkd 是一個輕量級消息中間件,它最大特色是將本身定位爲基於管道  (tube) 和任務 (job) 的工做隊列 (work-queue):java

Beanstalkd 支持任務優先級 (priority), 延時 (delay), 超時重發 (time-to-run) 和預留 (buried), 可以很好的支持分佈式的後臺任務和定時任務處理。python

它的內部實現採用 libevent, 服務器-客戶端之間用相似 memcached 的輕量級通信協議,具備有很高的性能。c++

儘管是內存隊列, beanstalkd 提供了 binlog 機制, 當重啓 beanstalkd 時,當前任務狀態可以從紀錄的本地 binlog 中恢復。git

管道 (tube):github

       管道相似於消息主題 (topic), 在一個 Beanstalkd 中能夠支持多個管道, 每一個管道都有本身的發佈者 (producer) 和消費者 (consumer). 管道之間互相不影響。 數據庫

任務 (job):json

       Beanstalkd 用任務 (job) 代替消息 (message) 的概念。與消息不一樣,任務有一系列狀態:vim

Beanstalkd


READY- 須要當即處理的任務,當延時 (DELAYED) 任務到期後會自動成爲當前任務;

DELAYED- 延遲執行的任務, 當消費者處理任務後, 能夠用將消息再次放回 DELAYED 隊列延遲執行;

RESERVED- 已經被消費者獲取, 正在執行的任務。Beanstalkd 負責檢查任務是否在 TTR(time-to-run) 內完成;

BURIED- 保留的任務: 任務不會被執行,也不會消失,除非有人把它 "踢" 回隊列;

DELETED- 消息被完全刪除。Beanstalkd 再也不維持這些消息。

任務優先級 (priority):

       任務 (job) 能夠有 0~2^32 個優先級, 0 表明最高優先級。 beanstalkd 採用最大最小堆 (Min-max heap) 處理任務優先級排序, 任什麼時候刻調用 reserve 命令的消費者老是能拿到當前優先級最高的任務, 時間複雜度爲 O(logn).

延時任務 (delay):

       有兩種方式能夠延時執行任務 (job): 生產者發佈任務時指定延時;或者當任務處理完畢後, 消費者再次將任務放入隊列延時執行 (RELEASE with <delay>)。這種機制能夠實現分佈式的 java.util.Timer,這種分佈式定時任務的優點是:若是某個消費者節點故障,任務超時重發 (time-to-run) 可以保證任務轉移到另外的節點執行。

任務超時重發 (time-to-run):

       Beanstalkd 把任務返回給消費者之後:消費者必須在預設的 TTR (time-to-run) 時間內發送 delete / release/ bury 改變任務狀態;不然 Beanstalkd 會認爲消息處理失敗,而後把任務交給另外的消費者節點執行。若是消費者預計在 TTR (time-to-run) 時間內沒法完成任務, 也能夠發送 touch 命令, 它的做用是讓 Beanstalkd 從系統時間從新計算 TTR (time-to-run).

任務預留 (buried):

       若是任務由於某些緣由沒法執行, 消費者能夠把任務置爲 buried 狀態讓 Beanstalkd 保留這些任務。管理員能夠經過 peek buried 命令查詢被保留的任務,而且進行人工干預。簡單的, kick <n> 可以一次性把 n 條被保留的任務踢回隊列。

Beanstalkd 協議:

       Beanstalkd 採用類 memcached 協議, 客戶端經過文本命令與服務器交互。這些命令能夠簡單的分紅三組:    

       生產類 - use <tube> / put <priority> <delay> <ttr> [bytes]:  

       生產者用 use 選擇一個管道 (tube), 而後用 put 命令向管道發佈任務 (job).    

       消費類 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>

       消費者用 watch 選擇多個管道 (tube), 而後用 reserve 命令獲取待執行的任務,這個命令是阻塞的。客戶端直到有任務可執行才返回。當任務處理完畢後, 消費者能夠完全刪除任務 (DELETE), 釋聽任務讓別人處理 (RELEASE), 或者保留 (BURY) 任務。

       維護類 - peek job / peek delayed / peek ready / peek buried / kick <n>

用於維護管道內的任務狀態, 在不改變任務狀態的條件下獲取任務。能夠用消費類命令改變這些任務的狀態。

被保留 (buried) 的任務能夠用 kick 命令 "踢" 回隊列。

          協議文檔: https://raw.github.com/kr/beanstalkd/master/doc/protocol.txt


Beanstalkd 不足:

Beanstalkd 沒有提供主備同步 + 故障切換機制, 在應用中有成爲單點的風險。實際應用中,能夠用數據庫爲任務 (job) 提供持久化存儲。

Beanstalkd


另外, 和 memcached 相似, Beanstalkd 依賴 libevent 的單線程事件分發機制, 不能有效利用多核 cpu 的性能。這一點能夠經過單機部署多個實例克服。


2、部署安裝:

Beanstalkd 的安裝很是簡單:

在Ubuntu和debian下使用下面命令:

sudo apt-get install beanstalkd

安裝後編輯配置文件:

vim /etc/default/beanstalkd

163040445.jpg

把START=NO改成:START=yes便可

更多關於安裝能夠參考官網



經過命令能夠啓動、中止Beanstalk


/etc/init.d/beanstalkd start
lsof -i:11300
/etc/init.d/beanstalkd stop


163604656.jpg


啓動後,就能夠經過客戶端進行調用了:

Beanstalk支持多種客戶端語言:

php,java,perl,c,c++,lua,python,go,ruby等等(瞭解更多能夠來官網)。

咱們將經過php給你們介紹在生產環境下面的使用。

就拿錄視頻製程序使用到的Beanstalk來給你們介紹:

先介紹一下程序結構:

視頻錄製程序分爲兩個方面,一個是產生錄製任務的腳本(生產者),還有一個處理錄製任務腳本(消費者)。

首先把php的客戶端下載後,加入到項目中。下面把代碼貼出來:


生產者:

#!/usr/bin/php
<?php
require_once 'Configuration.php';
require_once 'Record.class.php';
require_once 'BeanStalk.class.php';
$now=time();
$model = new RecordModel ();
$records=$model->checkStartRecord($now);
//print_r($records);
//exit();
$beanstalk = BeanStalk::open ( array (
            'servers' => array (
                Configuration::$record_config['beanStak']
                ),
            'select' => 'random peek'
            ) );
$beanstalk->use_tube ( 'records' );
foreach ( $records as $record ) {
    $beanstalk->put ( 0, 0, 10, json_encode ( $record ) );
}
?>


消費者:

<?php
require_once('config.php');
require_once('func.php');
require_once('BeanStalk.class.php');
$beanstalk = BeanStalk::open(array(
            'servers'       => array( $config['beanStak'] ),
            'select'        => 'random peek'
            ));
$beanstalk->watch('records');
while(true){
    //$beanstalk->watch('records');
    $job = $beanstalk->reserve_with_timeout();
    if(is_object($job)){
        $data=$job->get();
        $json=json_decode($data,true);
        print_r($json);
        if(!empty($json["live_name"])&&!empty($json["start_time"])&&!empty($json["end_time"])&&!empty($json["vod_id"])){
            //print_r($json);
            if(!empty($json["afterplay"])&&$json["afterplay"]==1)
            $cmd="{$config['afterplaycmd']} {$json["live_name"]} {$json["vod_id"]} {$json['start_time']} {$json['end_time']}";
            else
            $cmd="{$config['recordcmd']} {$json["live_name"]} {$json["vod_id"]} {$json['start_time']} {$json['end_time']}";
               
            echo $cmd;
            $chkcmd="ps -ef |grep '".$cmd."'  |grep -v 'grep'|wc -l";
            //$chkcmd="ps -ef |wc -l";
            //echo $chkcmd;
            $count=system($chkcmd);
            //echo $count;
            if($count==0)
            {
                //system($cmd);
                exec($cmd,$res,$rc);
                //print_r($res);
                //print_r($rc);
            }
            Beanstalk::delete($job);   // Delete the job.
            $info=array();
            $info["vod_id"]=$json['vod_id'];
            $info["record_msg"]="startjob";
            $data=array();
            $data["type"]="reciveRecords";
            $data["message"]=$info;
            $url=$config['recordStatus'];
            $httpcode = 200;
            $result = test_api($httpcode,$url,"post",json_encode($data));
            print_r($data);
        }
        //$beanstalk->watch('records');
    }
    sleep(1);
}
?>

下面咱們介紹一個能夠管理Beanstalk的php工具,地址以下

https://github.com/jimbojsb/bstools

把該工具安裝後,就能夠查看Beanstalk的各類狀況了


165839569.jpg


165651192.jpg


到此結束,不足之處歡迎拍磚

相關文章
相關標籤/搜索