beanstalkd 消息隊列

概況:
Beanstalkd,一個高性能、輕量級的分佈式內存隊列系統,最初設計的目的是想經過後臺異步執行耗時的任務來下降高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。後來開源,如今有PostRank大規模部署和使用,天天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是一樣的風格,因此使用過memcached的用戶會以爲Beanstalkd似曾相識。

beanstalk核心概念:

job:一個須要異步處理的任務,須要放在一個tube中。
tube:一個有名的任務隊列,用來存儲統一類型的job
producer:job的生產者
consumer:job的消費者

簡單來講流程就一句話:
由 producer 產生一個任務 job ,並將 job 推動到一個 tube 中,
而後由 consumer 從 tube 中取出 job 執行(固然了,這一切的操做的前提是beanstalk服務正在運行中)。php

 

一個job有READY(時刻準備着被消費者取出), RESERVED(任務正在被一個消費者處理中), DELAYED(延遲任務,設定的延遲時間後進入ready狀態), BURIED(休眠中,須要轉移狀態後才能操做)四種狀態。當producer直接put一個job時,job就處於READY狀態,等待consumer來處理,若是選擇延遲put,job就先到DELAYED狀態,等待時間事後才遷移到READY狀態。consumer獲取了當前READY的job後,該job的狀態就遷移到RESERVED,這樣其餘的consumer就不能再操做該job。當consumer完成該job後,能夠選擇delete, release或者bury操做;delete以後,job從系統消亡,以後不能再獲取;release操做能夠從新把該job狀態遷移回READY(也能夠延遲該狀態遷移操做),使其餘的consumer能夠繼續獲取和執行該job;有意思的是bury操做,能夠把該job休眠,等到須要的時候,再將休眠的job kick回READY狀態,也能夠delete BURIED狀態的job。正是有這些有趣的操做和狀態,才能夠基於此作出不少意思的應用,好比要實現一個循環隊列,就能夠將RESERVED狀態的job休眠掉,等沒有READY狀態的job時再將BURIED狀態的job一次性kick回READY狀態。

beanstalkd擁有的一些特性:
++    producer產生的任務能夠給他分配一個優先級,支持0到2**32的優先級,值越小,優先級越高,默認優先級爲1024。
    優先級高的會被消費者首先執行
++    持久化,能夠經過binlog將job及其狀態記錄到文件裏面,在Beanstalkd下次啓動時能夠
    經過讀取binlog來恢復以前的job及狀態。
++    分佈式容錯,分佈式設計和Memcached相似,beanstalkd各個server之間並不知道彼此的存在,
    都是經過client來實現分佈式以及根據tube名稱去特定server獲取job。
++    超時控制,爲了防止某個consumer長時間佔用任務但不能處理的狀況,
    Beanstalkd爲reserve操做設置了timeout時間,若是該consumer不能在指定時間內完成job,
    job將被遷移回READY狀態,供其餘consumer執行。

html

枯燥的文字介紹結束,舉個例子看看如何在實際項目中應用

若是你有疑問爲何要用消息隊列,請跳到文末【使用消息隊列的10個理由】

這裏仍是要提醒一句:
消息隊列有不少用途,也在不少大型網站中有不一樣程度的使用,使用靈活,能解決不少
問題,可是! 在靈活的背後也是有不少潛在代價的,好比隊列意外掛掉,系統維護難度增大,濫用隊列致使
系統性能降低等。 因此,請確保你的業務邏輯適合使用消息隊列,而且你能處理好意外狀況。

測試環境 ( Ubuntu Server 14.04 + PHP5.5 + Beanstalk V1.10 )

例子分析:微博是一個很典型的例子:
1,發一個微博
2,推送給他的粉絲 (若是有100w個粉絲,這個地方會堵塞好久,用戶感覺到的就是延遲)
在微博上發佈一條內容要作上面兩件事情纔算完整,發一條微博只須要進行一次簡單的數據庫操做,
可是推送給他的粉絲卻要操做100w次數據庫,致使用戶發一個微博要等待很長的延遲才能返回結果發送成功。

採用隊列的方式,用戶發送一條微博立馬返回結果,發送成功,剩下的推送就放到隊列裏面異步執行,
推送並不須要特別及時,延遲過幾秒幾十秒都是能夠接受的。    

下面用代碼來實現上面的思路:

首先在ubuntu上安裝beanstalkd服務,
apt-get install beanstalkd
(官網地址:http://kr.github.io/beanstalkd/download.html)
運行beanstalk  /etc/init.d/beanstalkd start
若是啓動失敗,在 /etc/default/beanstalkd 中添加 START=yes

須要三個文件:
a.php (任務生產者)
b.php (任務消費者)
beanstalk.php (beanstalkd的php客戶端,這裏選的是 https://github.com/davidpersson/beanstalk)
beanstalk支持的客戶端下載:
https://github.com/kr/beanstalkd/wiki/client-librariespython

// a.php
// 僞代碼以下:
// 
require_once "beanstalk.php";

//接受參數
$user  = $_POST['uid'];  //發微博的用戶
$content = $_POST['content'];  //發送內容

// 插入微博 
// 取出當前插入微博的id => wid
// 查找出用戶的全部粉絲uid => fensi
// 把推送放進隊列
newtask($wid,$fensi);

function newtask($wid,$fensi)
{
    $beanstalk = new Client();

    $beanstalk->connect();

    $beanstalk->useTube('test');
    
    foreach ($fensi as $key => $value) {
        //參數說明  優先級  延遲執行  執行超時  任務字符串
        $task = $value . "#" . $wid;   //  粉絲id和微博id
        $beanstalk->put( 1024,  0,   60,   $task );
    }
    
    $beanstalk->disconnect();
}

 

 

//b.php

// 僞代碼以下:
// 
require_once "beanstalk.php";

$beanstalk = new Client();

$beanstalk->connect();

$beanstalk->watch('test');

while(true) 
{
    $job = $beanstalk->reserve(0); 

    if( !$job ) {
        // 這個地方必定要休眠  若是不休眠 while循環會致使cpu跑滿
        sleep(1);
        continue;
    }
    $result = $job['body'];

    $pos = explode("#", $result);
    $weibo = $pos[1];
    $uid = $pos[0];
    // 插入粉絲推送數據
    // .................
    // .................
    $beanstalk->delete($job['id']);

}

$beanstalk->disconnect();

 

 beanstalk.php 在這裏: https://github.com/davidpersson/beanstalkgit

 

使用消息隊列的10個理由:

1. 解耦
在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。消息隊列在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。
2. 冗餘
有時在處理數據的時候處理過程會失敗。除非數據被持久化,不然將永遠丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。在被許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。
3. 擴展性
由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的;只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。
4. 靈活性 & 峯值處理能力
當你的應用上了Hacker News的首頁,你將發現訪問流量攀升到一個不一樣尋常的水平。在訪問量劇增的狀況下,你的應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住增加的訪問壓力,而不是由於超出負荷的請求而徹底崩潰。請查看咱們關於峯值處理能力的博客文章瞭解更多此方面的信息。
5. 可恢復性
當體系的一部分組件失效,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。而這種容許重試或者延後處理請求的能力一般是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。
6. 送達保證
消息隊列提供的冗餘機制保證了消息能被實際的處理,只要一個進程讀取了該隊列便可。在此基礎上,IronMQ提供了一個"只送達一次"保證。不管有多少進程在從隊列中領取數據,每個消息只能被處理一次。這之因此成爲可能,是由於獲取一個消息只是"預約"了這個消息,暫時把它移出了隊列。除非客戶端明確的表示已經處理完了這個消息,不然這個消息會被放回隊列中去,在一段可配置的時間以後可再次被處理。
7.排序保證
在許多狀況下,數據處理的順序都很重要。消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。IronMO保證消息漿糊經過FIFO(先進先出)的順序來處理,所以消息在隊列中的位置就是從隊列中檢索他們的位置。
8.緩衝
在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行--寫入隊列的處理會盡量的快速,而不受從隊列讀的預備處理的約束。該緩衝有助於控制和優化數據流通過系統的速度。
9. 理解數據流
在一個分佈式系統裏,要獲得一個關於用戶操做會用多長時間及其緣由的整體印象,是個巨大的挑戰。消息系列經過消息被處理的頻率,來方便的輔助肯定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。
10. 異步通訊
不少時候,你不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許你把一個消息放入隊列,但並不當即處理它。你想向隊列中放入多少消息就放多少,而後在你樂意的時候再去處理它們。

補充一個:  多語言通訊,好比用php生產一個job,用python或者其餘語言做爲消費者來處理github

相關文章
相關標籤/搜索