更多: http://www.zhihu.com/question/19946006
更多: http://www.zhihu.com/question/19946006
過去幾年中,咱們一直在使用、構建和宣傳消息隊列,咱們認爲它們是很使人敬畏的,這也不是什麼祕密。咱們相信對任何架構或應用來講,消息隊列都是一個相當重要的組件,下面是十個理由:php 1. 解耦html 在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。消息隊列在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。mysql |
2. 冗餘redis 有時在處理數據的時候處理過程會失敗。除非數據被持久化,不然將永遠丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。在被許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。sql 3. 擴展性數據庫 由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的;只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。數組 |
4. 靈活性 & 峯值處理能力緩存 當你的應用上了Hacker News的首頁,你將發現訪問流量攀升到一個不一樣尋常的水平。在訪問量劇增的狀況下,你的應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住增加的訪問壓力,而不是由於超出負荷的請求而徹底崩潰。請查看咱們關於峯值處理能力的博客文章瞭解更多此方面的信息。安全 5. 可恢復性網絡 當體系的一部分組件失效,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。而這種容許重試或者延後處理請求的能力一般是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。 |
6. 送達保證 消息隊列提供的冗餘機制保證了消息能被實際的處理,只要一個進程讀取了該隊列便可。在此基礎上,IronMQ提供了一個"只送達一次"保證。不管有多少進程在從隊列中領取數據,每個消息只能被處理一次。這之因此成爲可能,是由於獲取一個消息只是"預約"了這個消息,暫時把它移出了隊列。除非客戶端明確的表示已經處理完了這個消息,不然這個消息會被放回隊列中去,在一段可配置的時間以後可再次被處理。 |
7.排序保證 在許多狀況下,數據處理的順序都很重要。消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。IronMO保證消息漿糊經過FIFO(先進先出)的順序來處理,所以消息在隊列中的位置就是從隊列中檢索他們的位置。 8.緩衝 在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行--寫入隊列的處理會盡量的快速,而不受從隊列讀的預備處理的約束。該緩衝有助於控制和優化數據流通過系統的速度。 |
9. 理解數據流 在一個分佈式系統裏,要獲得一個關於用戶操做會用多長時間及其緣由的整體印象,是個巨大的挑戰。消息系列經過消息被處理的頻率,來方便的輔助肯定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。 10. 異步通訊 不少時候,你不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許你把一個消息放入隊列,但並不當即處理它。你想向隊列中放入多少消息就放多少,而後在你樂意的時候再去處理它們。 |
咱們相信上述十個緣由,使得消息隊列成爲在進程或應用之間進行通訊的最好形式。咱們已經花費了一年時間來建立和學習IronMQ,咱們的客戶也經過消息隊列完成了許多難以想象的事情。隊列是建立強大的分佈式應用的關鍵,它能夠利用雲技術所提供的全部強大能量。 若是如今你想要開始使用一個高效的、可靠的、託管的消息隊列,下載IronMQ吧。若是你想聯繫咱們的工程師,諮詢如何把隊列集成到你的應用中去,他們隨時歡迎你的訪問:get.iron.io/chat。 |
最簡單的用mysql實現消息隊列:
http://www.jb51.net/article/30164.htm
Memcache 通常用於緩存服務。可是不少時候,好比一個消息廣播系統,須要一個消息隊列。直接從數據庫取消息,負載每每不行。若是將整個消息隊列用一個key緩存到memcache裏面.
對於一個很大的消息隊列,頻繁進行進行大數據庫的序列化 和 反序列化,有太耗費。下面是我用PHP 實現的一個消息隊列,只須要在尾部插入一個數據,就操做尾部,不用操做整個消息隊列進行讀取,與操做。可是,這個消息隊列不是線程安全的,我只是儘可能的避免了衝突的可能性。若是消息不是很是的密集,好比幾秒鐘才一個,仍是能夠考慮這樣使用的。
若是你要實現線程安全的,一個建議是經過文件進行鎖定,而後進行操做。下面是代碼:
class Memcache_Queue { private $memcache; private $name; private $prefix; function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") { if ($memcache == null) { throw new Exception("memcache object is null, new the object first."); } $this->memcache = $memcache; $this->name = $name; $this->prefix = $prefix; $this->maxSize = $maxSize; $this->front = 0; $this->real = 0; $this->size = 0; } function __get($name) { return $this->get($name); } function __set($name, $value) { $this->add($name, $value); return $this; } function isEmpty() { return $this->size == 0; } function isFull() { return $this->size == $this->maxSize; } function enQueue($data) { if ($this->isFull()) { throw new Exception("Queue is Full"); } $this->increment("size"); $this->set($this->real, $data); $this->set("real", ($this->real + 1) % $this->maxSize); return $this; } function deQueue() { if ($this->isEmpty()) { throw new Exception("Queue is Empty"); } $this->decrement("size"); $this->delete($this->front); $this->set("front", ($this->front + 1) % $this->maxSize); return $this; } function getTop() { return $this->get($this->front); } function getAll() { return $this->getPage(); } function getPage($offset = 0, $limit = 0) { if ($this->isEmpty() || $this->size < $offset) { return null; } $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize); $num = 1; for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); $num++; if ($limit > 0 && $limit == $num) { break; } } return array_values($this->memcache->get($keys)); } function makeEmpty() { $keys = $this->getAllKeys(); foreach ($keys as $value) { $this->delete($value); } $this->delete("real"); $this->delete("front"); $this->delete("size"); $this->delete("maxSize"); } private function getAllKeys() { if ($this->isEmpty()) { return array(); } $keys[] = $this->getKeyByPos($this->front); for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); } return $keys; } private function add($pos, $data) { $this->memcache->add($this->getKeyByPos($pos), $data); return $this; } private function increment($pos) { return $this->memcache->increment($this->getKeyByPos($pos)); } private function decrement($pos) { $this->memcache->decrement($this->getKeyByPos($pos)); } private function set($pos, $data) { $this->memcache->set($this->getKeyByPos($pos), $data); return $this; } private function get($pos) { return $this->memcache->get($this->getKeyByPos($pos)); } private function delete($pos) { return $this->memcache->delete($this->getKeyByPos($pos)); } private function getKeyByPos($pos) { return $this->prefix . $this->name . $pos; } }
項目中須要實現一個簡單的隊列,爲了這麼點小小的功能, 再去新搭建一個 RabbitMQ, Redis或者MongoDB 之類的東西, 感受有點動靜太大,爲了吃根香腸何須殺一頭豬? 因此考慮用手頭上現有的memcache 搭建一個簡單的內存消息隊列.
memcache功能太簡單了,只能 set get 和delete, 只能保存key-value的數據, 不能保存列表。 固然也能夠把一個列表給序列化了以後存進memcache, 可是會存在併發的問題, 每次保存數據(插隊或者出隊)的時候都要給數據加鎖,在高併發的狀況下很難保證數據的一致性!
可是memcache 有一個 increment 的操做,爲某一個鍵對應的值進行加1(其實是加法運算, 默認加1), 這個操做是原子性的, 因此咱們能夠經過這個來維護一個自增的id來保證數據的惟一。 再加上兩個指針來維護起始鍵值, 這樣就構建了一個簡單的但相隊列!!
<?php /** * memcache構建的簡單內存隊列 * * @author: jeffjing */ class memList { private $memcache; // memcache類 private $queKeyPrefix; //數據鍵前綴 private $startKey; //開始指針鍵 private $startKey; //結束指針鍵 public function __construct($key){ $this->queKeyPrefix = "MEMQUE_{$key}_"; $this->startKey = "MEMQUE_SK_{$key}"; $this->endKey = "MEMQUE_EK_{$key}"; } /** * 獲取列表 * 先拿到開始結束指針, 而後去拿數據 * * @return array */ public function getList(){ $startP = $this->memcache->get($this->startKey); $endP = $this->memcache->get($this->endKey); empty($startP) && $startP = 0; empty($endP) && $endP = 0; $arr = array(); for($i = $startP ; $i < $endP; ++$i) { $key = $this->queKeyPrefix . $i; $arr[] = $this->memcache->get($key); } return $arr; } /** * 插入隊列 * 結束指針後移,拿到一個自增的id * 再把值存到指針指定的位置 * * @return void */ public function in($value){ $index = $this->memcache->increment($this->endKey); $key = $this->queKeyPrefix . $index; $this->memcache->set($key, $value); } /** * 出隊 * 很簡單, 把開始值取出後開始指針後移 * * @return mixed */ public function out(){ $result = $this->memcache->get($this->startKey); $this->memcache->increment($this->startKey); return $result; } }