Beanstalkd 是一個高性能的消息隊列中間件,本博文宅鳥將介紹一下這個東東的使用。php
1、先經過概念讓你們瞭解Beanstalkd的特性和工做場景。html
Beanstalkd 是一個輕量級消息中間件,它最大特色是將本身定位爲基於管道 (tube) 和任務 (job) 的工做隊列 (work-queue):java
Beanstalkd 支持任務優先級 (priority), 延時 (delay), 超時重發 (time-to-run) 和預留 (buried), 可以很好的支持分佈式的後臺任務和定時任務處理。python
它的內部實現採用 libevent, 服務器-客戶端之間用相似 memcached 的輕量級通信協議,具備有很高的性能。c++
儘管是內存隊列, beanstalkd 提供了 binlog 機制, 當重啓 beanstalkd 時,當前任務狀態可以從紀錄的本地 binlog 中恢復。git
管道 (tube):github
管道相似於消息主題 (topic), 在一個 Beanstalkd 中能夠支持多個管道, 每一個管道都有本身的發佈者 (producer) 和消費者 (consumer). 管道之間互相不影響。 數據庫
任務 (job):json
Beanstalkd 用任務 (job) 代替消息 (message) 的概念。與消息不一樣,任務有一系列狀態:vim
READY- 須要當即處理的任務,當延時 (DELAYED) 任務到期後會自動成爲當前任務;
DELAYED- 延遲執行的任務, 當消費者處理任務後, 能夠用將消息再次放回 DELAYED 隊列延遲執行;
RESERVED- 已經被消費者獲取, 正在執行的任務。Beanstalkd 負責檢查任務是否在 TTR(time-to-run) 內完成;
BURIED- 保留的任務: 任務不會被執行,也不會消失,除非有人把它 "踢" 回隊列;
DELETED- 消息被完全刪除。Beanstalkd 再也不維持這些消息。
任務優先級 (priority):
任務 (job) 能夠有 0~2^32 個優先級, 0 表明最高優先級。 beanstalkd 採用最大最小堆 (Min-max heap) 處理任務優先級排序, 任什麼時候刻調用 reserve 命令的消費者老是能拿到當前優先級最高的任務, 時間複雜度爲 O(logn).
延時任務 (delay):
有兩種方式能夠延時執行任務 (job): 生產者發佈任務時指定延時;或者當任務處理完畢後, 消費者再次將任務放入隊列延時執行 (RELEASE with <delay>)。這種機制能夠實現分佈式的 java.util.Timer,這種分佈式定時任務的優點是:若是某個消費者節點故障,任務超時重發 (time-to-run) 可以保證任務轉移到另外的節點執行。
任務超時重發 (time-to-run):
Beanstalkd 把任務返回給消費者之後:消費者必須在預設的 TTR (time-to-run) 時間內發送 delete / release/ bury 改變任務狀態;不然 Beanstalkd 會認爲消息處理失敗,而後把任務交給另外的消費者節點執行。若是消費者預計在 TTR (time-to-run) 時間內沒法完成任務, 也能夠發送 touch 命令, 它的做用是讓 Beanstalkd 從系統時間從新計算 TTR (time-to-run).
任務預留 (buried):
若是任務由於某些緣由沒法執行, 消費者能夠把任務置爲 buried 狀態讓 Beanstalkd 保留這些任務。管理員能夠經過 peek buried 命令查詢被保留的任務,而且進行人工干預。簡單的, kick <n> 可以一次性把 n 條被保留的任務踢回隊列。
Beanstalkd 協議:
Beanstalkd 採用類 memcached 協議, 客戶端經過文本命令與服務器交互。這些命令能夠簡單的分紅三組:
生產類 - use <tube> / put <priority> <delay> <ttr> [bytes]:
生產者用 use 選擇一個管道 (tube), 而後用 put 命令向管道發佈任務 (job).
消費類 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>
消費者用 watch 選擇多個管道 (tube), 而後用 reserve 命令獲取待執行的任務,這個命令是阻塞的。客戶端直到有任務可執行才返回。當任務處理完畢後, 消費者能夠完全刪除任務 (DELETE), 釋聽任務讓別人處理 (RELEASE), 或者保留 (BURY) 任務。
維護類 - peek job / peek delayed / peek ready / peek buried / kick <n>
用於維護管道內的任務狀態, 在不改變任務狀態的條件下獲取任務。能夠用消費類命令改變這些任務的狀態。
被保留 (buried) 的任務能夠用 kick 命令 "踢" 回隊列。
協議文檔: https://raw.github.com/kr/beanstalkd/master/doc/protocol.txt
Beanstalkd 不足:
Beanstalkd 沒有提供主備同步 + 故障切換機制, 在應用中有成爲單點的風險。實際應用中,能夠用數據庫爲任務 (job) 提供持久化存儲。
另外, 和 memcached 相似, Beanstalkd 依賴 libevent 的單線程事件分發機制, 不能有效利用多核 cpu 的性能。這一點能夠經過單機部署多個實例克服。
2、部署安裝:
Beanstalkd 的安裝很是簡單:
在Ubuntu和debian下使用下面命令:
sudo apt-get install beanstalkd
安裝後編輯配置文件:
vim /etc/default/beanstalkd
把START=NO改成:START=yes便可
更多關於安裝能夠參考官網
經過命令能夠啓動、中止Beanstalk
/etc/init.d/beanstalkd start lsof -i:11300 /etc/init.d/beanstalkd stop
啓動後,就能夠經過客戶端進行調用了:
Beanstalk支持多種客戶端語言:
php,java,perl,c,c++,lua,python,go,ruby等等(瞭解更多能夠來官網)。
咱們將經過php給你們介紹在生產環境下面的使用。
就拿錄視頻製程序使用到的Beanstalk來給你們介紹:
先介紹一下程序結構:
視頻錄製程序分爲兩個方面,一個是產生錄製任務的腳本(生產者),還有一個處理錄製任務腳本(消費者)。
首先把php的客戶端下載後,加入到項目中。下面把代碼貼出來:
生產者:
#!/usr/bin/php <?php require_once 'Configuration.php'; require_once 'Record.class.php'; require_once 'BeanStalk.class.php'; $now=time(); $model = new RecordModel (); $records=$model->checkStartRecord($now); //print_r($records); //exit(); $beanstalk = BeanStalk::open ( array ( 'servers' => array ( Configuration::$record_config['beanStak'] ), 'select' => 'random peek' ) ); $beanstalk->use_tube ( 'records' ); foreach ( $records as $record ) { $beanstalk->put ( 0, 0, 10, json_encode ( $record ) ); } ?>
消費者:
<?php require_once('config.php'); require_once('func.php'); require_once('BeanStalk.class.php'); $beanstalk = BeanStalk::open(array( 'servers' => array( $config['beanStak'] ), 'select' => 'random peek' )); $beanstalk->watch('records'); while(true){ //$beanstalk->watch('records'); $job = $beanstalk->reserve_with_timeout(); if(is_object($job)){ $data=$job->get(); $json=json_decode($data,true); print_r($json); if(!empty($json["live_name"])&&!empty($json["start_time"])&&!empty($json["end_time"])&&!empty($json["vod_id"])){ //print_r($json); if(!empty($json["afterplay"])&&$json["afterplay"]==1) $cmd="{$config['afterplaycmd']} {$json["live_name"]} {$json["vod_id"]} {$json['start_time']} {$json['end_time']}"; else $cmd="{$config['recordcmd']} {$json["live_name"]} {$json["vod_id"]} {$json['start_time']} {$json['end_time']}"; echo $cmd; $chkcmd="ps -ef |grep '".$cmd."' |grep -v 'grep'|wc -l"; //$chkcmd="ps -ef |wc -l"; //echo $chkcmd; $count=system($chkcmd); //echo $count; if($count==0) { //system($cmd); exec($cmd,$res,$rc); //print_r($res); //print_r($rc); } Beanstalk::delete($job); // Delete the job. $info=array(); $info["vod_id"]=$json['vod_id']; $info["record_msg"]="startjob"; $data=array(); $data["type"]="reciveRecords"; $data["message"]=$info; $url=$config['recordStatus']; $httpcode = 200; $result = test_api($httpcode,$url,"post",json_encode($data)); print_r($data); } //$beanstalk->watch('records'); } sleep(1); } ?>
下面咱們介紹一個能夠管理Beanstalk的php工具,地址以下
https://github.com/jimbojsb/bstools
把該工具安裝後,就能夠查看Beanstalk的各類狀況了
到此結束,不足之處歡迎拍磚