消息隊列使用

「消息」是在兩臺計算機間傳送的數據單位。消息能夠很是簡單,例如只包含文本字符串;也能夠更復雜,可能包含 嵌入對象
消息被髮送到隊列中。「消息隊列」是在消息的傳輸過程當中保存消息的容器。消息隊列管理器在將消息從它的源中繼到它的目標時充當中間人。隊列的主要目的是提供路由並保證消息的傳遞;若是發送消息時接收者不可用,消息隊列會保留消息,直到能夠成功地傳遞它。
一篇文章:使用消息隊列的10個理由

過去幾年中,咱們一直在使用、構建和宣傳消息隊列,咱們認爲它們是很使人敬畏的,這也不是什麼祕密。咱們相信對任何架構或應用來講,消息隊列都是一個相當重要的組件,下面是十個理由:php

1. 解耦html

在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。消息隊列在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。mysql

 

 

2. 冗餘redis

有時在處理數據的時候處理過程會失敗。除非數據被持久化,不然將永遠丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。在被許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。sql

3. 擴展性數據庫

由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的;只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。數組

 

4. 靈活性 & 峯值處理能力緩存

當你的應用上了Hacker News的首頁,你將發現訪問流量攀升到一個不一樣尋常的水平。在訪問量劇增的狀況下,你的應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住增加的訪問壓力,而不是由於超出負荷的請求而徹底崩潰。請查看咱們關於峯值處理能力的博客文章瞭解更多此方面的信息。安全

5. 可恢復性網絡

當體系的一部分組件失效,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。而這種容許重試或者延後處理請求的能力一般是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。

6. 送達保證

消息隊列提供的冗餘機制保證了消息能被實際的處理,只要一個進程讀取了該隊列便可。在此基礎上,IronMQ提供了一個"只送達一次"保證。不管有多少進程在從隊列中領取數據,每個消息只能被處理一次。這之因此成爲可能,是由於獲取一個消息只是"預約"了這個消息,暫時把它移出了隊列。除非客戶端明確的表示已經處理完了這個消息,不然這個消息會被放回隊列中去,在一段可配置的時間以後可再次被處理。

7.排序保證

在許多狀況下,數據處理的順序都很重要。消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。IronMO保證消息漿糊經過FIFO(先進先出)的順序來處理,所以消息在隊列中的位置就是從隊列中檢索他們的位置。

8.緩衝

在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行--寫入隊列的處理會盡量的快速,而不受從隊列讀的預備處理的約束。該緩衝有助於控制和優化數據流通過系統的速度。

9. 理解數據流

在一個分佈式系統裏,要獲得一個關於用戶操做會用多長時間及其緣由的整體印象,是個巨大的挑戰。消息系列經過消息被處理的頻率,來方便的輔助肯定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。

10. 異步通訊

不少時候,你不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許你把一個消息放入隊列,但並不當即處理它。你想向隊列中放入多少消息就放多少,而後在你樂意的時候再去處理它們。

咱們相信上述十個緣由,使得消息隊列成爲在進程或應用之間進行通訊的最好形式。咱們已經花費了一年時間來建立和學習IronMQ,咱們的客戶也經過消息隊列完成了許多難以想象的事情。隊列是建立強大的分佈式應用的關鍵,它能夠利用雲技術所提供的全部強大能量。

若是如今你想要開始使用一個高效的、可靠的、託管的消息隊列,下載IronMQ吧。若是你想聯繫咱們的工程師,諮詢如何把隊列集成到你的應用中去,他們隨時歡迎你的訪問:get.iron.io/chat

英文原文:Top 10 Uses For A Message Queue

最簡單的用mysql實現消息隊列:

http://www.jb51.net/article/30164.htm

php Memcache 中實現消息隊列

Memcache 通常用於緩存服務。可是不少時候,好比一個消息廣播系統,須要一個消息隊列。直接從數據庫取消息,負載每每不行。若是將整個消息隊列用一個key緩存到memcache裏面.

對於一個很大的消息隊列,頻繁進行進行大數據庫的序列化 和 反序列化,有太耗費。下面是我用PHP 實現的一個消息隊列,只須要在尾部插入一個數據,就操做尾部,不用操做整個消息隊列進行讀取,與操做。可是,這個消息隊列不是線程安全的,我只是儘可能的避免了衝突的可能性。若是消息不是很是的密集,好比幾秒鐘才一個,仍是能夠考慮這樣使用的。 
若是你要實現線程安全的,一個建議是經過文件進行鎖定,而後進行操做。下面是代碼: 

 

class Memcache_Queue 
{ 
private $memcache; 
private $name; 
private $prefix; 
function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") 
{ 
if ($memcache == null) { 
throw new Exception("memcache object is null, new the object first."); 
} 
$this->memcache = $memcache; 
$this->name = $name; 
$this->prefix = $prefix; 
$this->maxSize = $maxSize; 
$this->front = 0; 
$this->real = 0; 
$this->size = 0; 
} 
function __get($name) 
{ 
return $this->get($name); 
} 
function __set($name, $value) 
{ 
$this->add($name, $value); 
return $this; 
} 
function isEmpty() 
{ 
return $this->size == 0; 
} 
function isFull() 
{ 
return $this->size == $this->maxSize; 
} 
function enQueue($data) 
{ 
if ($this->isFull()) { 
throw new Exception("Queue is Full"); 
} 
$this->increment("size"); 
$this->set($this->real, $data); 
$this->set("real", ($this->real + 1) % $this->maxSize); 
return $this; 
} 
function deQueue() 
{ 
if ($this->isEmpty()) { 
throw new Exception("Queue is Empty"); 
} 
$this->decrement("size"); 
$this->delete($this->front); 
$this->set("front", ($this->front + 1) % $this->maxSize); 
return $this; 
} 
function getTop() 
{ 
return $this->get($this->front); 
} 
function getAll() 
{ 
return $this->getPage(); 
} 
function getPage($offset = 0, $limit = 0) 
{ 
if ($this->isEmpty() || $this->size < $offset) { 
return null; 
} 
$keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize); 
$num = 1; 
for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) 
{ 
$keys[] = $this->getKeyByPos($pos); 
$num++; 
if ($limit > 0 && $limit == $num) { 
break; 
} 
} 
return array_values($this->memcache->get($keys)); 
} 
function makeEmpty() 
{ 
$keys = $this->getAllKeys(); 
foreach ($keys as $value) { 
$this->delete($value); 
} 
$this->delete("real"); 
$this->delete("front"); 
$this->delete("size"); 
$this->delete("maxSize"); 
} 
private function getAllKeys() 
{ 
if ($this->isEmpty()) 
{ 
return array(); 
} 
$keys[] = $this->getKeyByPos($this->front); 
for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) 
{ 
$keys[] = $this->getKeyByPos($pos); 
} 
return $keys; 
} 
private function add($pos, $data) 
{ 
$this->memcache->add($this->getKeyByPos($pos), $data); 
return $this; 
} 
private function increment($pos) 
{ 
return $this->memcache->increment($this->getKeyByPos($pos)); 
} 
private function decrement($pos) 
{ 
$this->memcache->decrement($this->getKeyByPos($pos)); 
} 
private function set($pos, $data) 
{ 
$this->memcache->set($this->getKeyByPos($pos), $data); 
return $this; 
} 
private function get($pos) 
{ 
return $this->memcache->get($this->getKeyByPos($pos)); 
} 
private function delete($pos) 
{ 
return $this->memcache->delete($this->getKeyByPos($pos)); 
} 
private function getKeyByPos($pos) 
{ 
return $this->prefix . $this->name . $pos; 
} 
} 
View Code

 項目中須要實現一個簡單的隊列,爲了這麼點小小的功能, 再去新搭建一個 RabbitMQ, Redis或者MongoDB 之類的東西, 感受有點動靜太大,爲了吃根香腸何須殺一頭豬? 因此考慮用手頭上現有的memcache 搭建一個簡單的內存消息隊列.

         memcache功能太簡單了,只能 set get 和delete, 只能保存key-value的數據, 不能保存列表。 固然也能夠把一個列表給序列化了以後存進memcache, 可是會存在併發的問題, 每次保存數據(插隊或者出隊)的時候都要給數據加鎖,在高併發的狀況下很難保證數據的一致性!

        可是memcache 有一個 increment 的操做,爲某一個鍵對應的值進行加1(其實是加法運算, 默認加1), 這個操做是原子性的, 因此咱們能夠經過這個來維護一個自增的id來保證數據的惟一。 再加上兩個指針來維護起始鍵值, 這樣就構建了一個簡單的但相隊列!!       

 

<?php
/**
 * memcache構建的簡單內存隊列
 * 
 * @author: jeffjing
 */
class memList {
    private $memcache; // memcache類
    
    private $queKeyPrefix; //數據鍵前綴
    private $startKey; //開始指針鍵
    private $startKey; //結束指針鍵
    
    public function __construct($key){
        $this->queKeyPrefix = "MEMQUE_{$key}_";
        $this->startKey = "MEMQUE_SK_{$key}";
        $this->endKey = "MEMQUE_EK_{$key}";    
    }
    
    /**
     * 獲取列表
     *     先拿到開始結束指針, 而後去拿數據
     * 
     * @return array
     */
    public function getList(){
        $startP = $this->memcache->get($this->startKey);
        $endP = $this->memcache->get($this->endKey);        
        empty($startP) && $startP = 0;
        empty($endP) && $endP = 0;
    
        $arr = array();
        for($i = $startP ; $i < $endP; ++$i) {
            $key = $this->queKeyPrefix . $i;
            $arr[] = $this->memcache->get($key);
        }
        return $arr;
    }
    
    /**
     * 插入隊列
     *   結束指針後移,拿到一個自增的id
     *   再把值存到指針指定的位置
     *   
     *   @return void
     */
    public function in($value){
        $index = $this->memcache->increment($this->endKey);
        $key = $this->queKeyPrefix . $index;
        $this->memcache->set($key, $value);
    }
    
    /**
     * 出隊
     *    很簡單, 把開始值取出後開始指針後移
     * 
     * @return mixed
     */
    public function out(){
        $result = $this->memcache->get($this->startKey);
        $this->memcache->increment($this->startKey);
        return $result;
    }
    
}
View Code

Web應用中的輕量級消息隊列-MySQL

 
Web應用中爲何會須要消息隊列?主要緣由是因爲在高併發環境下,因爲來不及同步處理,請求每每會發生堵塞,好比說,大量的insert,update之類的請求同時到達 mysql,直接致使無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。經過使用消息隊列,咱們能夠異步處理請求,從而緩解系統的壓力。在Web2.0的時代,高併發的狀況愈來愈常見,從而使消息隊列有成爲居家必備的趨勢,相應的也涌現出了不少實現方案,像Twitter之前就使用RabbitMQ實現消息隊列服務,如今又轉而使用Kestrel來實現消息隊列服務,此外還有不少其餘的選擇,好比說:ActiveMQ,ZeroMQ等。
 
上述消息隊列的軟件中,大多爲了實現AMQP,STOMP,XMPP之類的協議,變得極其重量級,但在不少Web應用中的實際狀況是:咱們只是想找到一個緩解高併發請求的解決方案,不須要雜七雜八的功能,一個輕量級的消息隊列實現方式纔是咱們真正須要的。
第一感受是能不能使用memcached來 實現消息隊列?稍加考慮後就會發現它不合適,由於memcached僅僅支持鍵值方式的操做,沒有排序之類的功能,因此若是要用它來實現消息隊列,則必須 本身經過某個鍵來保存數組形式的隊列,不過這樣的話,在操做隊列的時候很容易丟失數據,好比說咱們要添加一個消息,則需先取出現有隊列,而後把消息保存到 隊列尾部,最後保存隊列,單純使用memcached的話,因爲咱們沒法保證整個過程的原子性,因此當處理若干個併發請求時,各個請求間可能會互相覆蓋, 丟失數據就在所不免。另外,memcached只是內存鍵值緩存而已,一旦宕機,數據就消失了。http://www.002pc.com
memcacheq的出現解決了上面的問題,它在memcached的基礎上實現了消息隊列,以php客戶端爲例:
消息從尾部入棧:memcache_set
消息從頭部出棧:memcache_get
memcacheq依附於memcached之上,因此你能夠經過現有的memcached工具來操做它,這無疑是它的一大優點,但它也有一個很大的缺點,那就是memcacheq自己的開發維護彷佛並不活躍,若是遇到問題的話,你極可能須要本身動手解決。
目前看來,我更推薦下面這種解決方案,那就是redis,若是不瞭解,能夠參考我之前的文章,表面上看,redis和memcached差很少,也是鍵值操做,可是redis自己實現了list,相關操做也能夠保證是原子的,因此能夠很天然的經過list來實現消息隊列: http://pingban.002pc.com/   淘寶平板電腦排行榜  
消息從尾部入棧:RPUSH
消息從頭部出棧:LPOP
redis自己雖然是一個新項目,但頗有朝氣,開發維護也很活躍,若是你的下一個Web應用裏須要使用輕量級的消息隊列,不妨使用它。
此外,還有很多其餘的選擇可供嘗試,好比說MySQL第三方的Q4M引擎,經過擴展SQL語法來操做消息隊列,也是一個不錯的選擇。
套用網絡流行語:那些重量級軟件實現的不是你要的功能,而只是獨在高處不勝寒的寂寞,因此沒必要迷戀其中,它們只是傳說而已。
相關文章
相關標籤/搜索