爲何須要消息隊列python
系統中引入消息隊列機制是對系統一個很是大的改善。例如一個web系統中,用戶作了某項操做後須要發送郵件通知到用戶郵箱中。你可使用同步方式讓用戶等待郵件發送完成後反饋給用戶,可是這樣可能會由於網絡的不肯定性形成用戶長時間的等待從而影響用戶體驗。
有些場景下是不可能使用同步方式等待完成的,那些須要後臺花費大量時間的操做。例如極端例子,一個在線編譯系統任務,後臺編譯完成須要30分鐘。這種場景的設計不可能同步等待後在回饋,必須是先反饋用戶隨後異步處理完成,再等待處理完成後根據狀況再此反饋用戶與否。
另外適用消息隊列的狀況是那些系統處理能力有限的狀況下,先使用隊列機制把任務暫時存放起來,系統再一個個輪流處理掉排隊的任務。這樣在系統吞吐量不足的狀況下也能穩定的處理掉高併發的任務。
消息隊列能夠用來作排隊機制,只要系統須要用到排隊機制的地方就可使用消息隊列來做。
rabbitmq的優先級作法
目前成熟的消息隊列產品有不少,著名的例如rabbitmq。它使用起來相對仍是比較簡單的,功能也相對比較豐富,通常場合下是徹底夠用的。可是有個很煩人的就是它不支持優先級。
例如一個發郵件的任務,某些特權用戶但願它的郵件可以更加及時的發送出去,至少比普通用戶要優先對待。默認狀況下rabbitmq是沒法處理掉 的,扔給rabbitmq的任務都是FIFO先進先出。可是咱們可使用一些變通的技巧來支持這些優先級。建立多個隊列,併爲rabbitmq的消費者設 置相應的路由規則。
例如默認狀況下有這樣一個隊列,咱們拿list來模擬 [task1, task2, task3],消費者輪流按照FIFO的原則一個個拿出task來處理掉。若是有高優先級的任務進來,它也只能跟在最後被處理[task1, task2, task3, higitask1]. 可是若是使用兩個隊列,一個高優先級隊列,一個普通優先級隊列。 普通優先級[task1, task2, task3], 高優先級[hightask1 ] 而後咱們設置消費者的路由讓消費者隨機從任意隊列中取數據便可。
而且咱們能夠定義一個專門處理高優先級隊列的消費者,它空閒的時候也不處理低優先級隊列的數據。這相似銀行的VIP櫃檯,普通客戶在銀行取號排隊,一個VIP來了他雖然沒有從取號機裏拿出一個排在普通會員前面的票,可是他仍是能夠更快地直接走VIP通道。
使用rabbitmq來作支持優先級的消息隊列的話,就像是上面所述同銀行VIP會員同樣,走不一樣的通道。可是這種方式只是相對的優先級,作不 到絕對的優先級控制,例如我但願某一個優先級高的任務在絕對意義上要比其餘普通任務優先處理掉,這樣上面的方案是行不通的。由於rabbitmq的消費者 只知道再本身空閒的狀況下從本身關心的隊列中「隨機」取某一個隊列裏面的第一個數據來處理,它無法控制優先取找哪個隊列。或者更加細粒度的優先級控制。 或者你係統裏面設置的優先級有10多種。這樣使用rabbitmq也是很難實現的。
可是若是使用redis來作隊列的話上面的需求均可以實現。
使用redis怎麼作消息隊列
首先redis它的設計是用來作緩存的,可是因爲它自身的某種特性使得他能夠用來作消息隊列。它有幾個阻塞式的API可使用,正是這些阻塞式的API讓他有作消息隊列的能力。
試想一下在」數據庫解決全部問題「的思路下,不使用消息隊列也是能夠完成你的需求的。咱們把任務所有存放在數據庫而後經過不斷的輪詢方式來取任 務處理。這種作法雖然能夠完成你的任務可是作法很粗劣。可是若是你的數據庫接口提供一個阻塞的方法那麼就能夠避免輪詢操做了,你的數據庫也能夠用來作消息 隊列,只不過目前的數據庫尚未這樣的接口。
另外作消息隊列的其餘特性例如FIFO也很容易實現,只須要一個List對象從頭取數據,從尾部塞數據便可實現。
redis能作消息隊列得益於他list對象blpop brpop接口以及Pub/Sub(發佈/訂閱)的某些接口。他們都是阻塞版的,因此能夠用來作消息隊列。
redis消息隊列優先級的實現
一些基礎redis基礎知識的說明
redis> blpop tasklist 0
"im task 01"
這個例子使用blpop命令會阻塞方式地從tasklist列表中取頭一個數據,最後一個參數就是等待超時的時間。若是設置爲0則表示無限等 待。另外redis存放的數據都只能是string類型,因此在任務傳遞的時候只能是傳遞字符串。咱們只須要簡單的將負責數據序列化成json格式的字符 串,而後消費者那邊再轉換一下便可。
這裏咱們的示例語言使用python,連接redis的庫使用redis-py. 若是你有些
編程基礎把它切換成本身喜歡的語言應該是沒問題的。
1.簡單的FIFO隊列
複製代碼
import redis, time
def handle(task):
print task
time.sleep(4)
def main():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
while 1:
result = r.brpop('tasklist', 0)
handle(result[1])
if __name__ == "__main__":
main()
複製代碼
上例子即便一個最簡單的消費者,咱們經過一個無限循環不斷地從redis的隊列中取數據。若是隊列中沒有數據則沒有超時的阻塞在那裏,有數據則取出往下執行。
通常狀況取出來是個複雜的字符串,咱們可能須要將其格式化後做爲再傳給處理函數,可是爲了簡單咱們的例子就是一個普通字符串。另外例子中的處理函數不作任何處理,僅僅sleep 用來模擬耗時的操做。
咱們另開一個redis的客戶端來模擬生產者,自帶的客戶端就能夠。多往tasklist 隊列裏面塞上一些數據。
redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> lpush tasklist 'im task 03'
redis> lpush tasklist 'im task 04'
redis> lpush tasklist 'im task 05'
隨後在消費者端便會看到這些模擬出來的任務被挨個消費掉。
2.簡單優先級的隊列
假設一種簡單的需求,只須要高優先級的比低優先級的任務率先處理掉。其餘任務之間的順序一律無論,這種咱們只須要在在遇到高優先級任務的時候將它塞到隊列的前頭,而不是push到最後面便可。
由於咱們的隊列是使用的redis的 list,因此很容易實現。遇到高優先級的使用rpush 遇到低優先級的使用lpush
redis> lpush tasklist 'im task 01'
redis> lpush tasklist 'im task 02'
redis> rpush tasklist 'im high task 01'
redis> rpush tasklist 'im high task 01'
redis> lpush tasklist 'im task 03'
redis> rpush tasklist 'im high task 03'
隨後會看到,高優先級的老是比低優先級的率先執行。可是這個方案的缺點是高優先級的任務之間的執行順序是先進後出的。
3.較爲完善的隊列
例子2中只是簡單的將高優先級的任務塞到隊列最前面,低優先級的塞到最後面。這樣保證不了高優先級任務之間的順序。
假設當全部的任務都是高優先級的話,那麼他們的執行順序將是相反的。這樣明顯違背了隊列的FIFO原則。
不過只要稍加改進就能夠完善咱們的隊列。
跟使用rabbitmq同樣,咱們設置兩個隊列,一個高優先級一個低優先級的隊列。高優先級任務放到高隊列中,低的放在低優先隊列中。redis和rabbitmq不一樣的是它能夠要求隊列消費者從哪一個隊列裏面先讀。
def main():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
while 1:
result = r.brpop(['high_task_queue', 'low_task_queue'], 0)
handle(result[1])
上面的代碼,會阻塞地從'high_task_queue', 'low_task_queue'這兩個隊列裏面取數據,若是第一個沒有再從第二個裏面取。
因此只須要將隊列消費者作這樣的改進即可以達到目的。
複製代碼
redis> lpush low_task_queue low001
redis> lpush low_task_queue low002
redis> lpush low_task_queue low003
redis> lpush low_task_queue low004
redis> lpush high_task_queue low001
redis> lpush high_task_queue low002
redis> lpush high_task_queue low003
redis> lpush high_task_queue low004
複製代碼
經過上面的測試看到,高優先級的會被率先執行,而且高優先級之間也是保證了FIFO的原則。
這種方案咱們能夠支持不一樣階段的優先級隊列,例如高中低三個級別或者更多的級別均可以。
4.優先級級別不少的狀況
假設有個這樣的需求,優先級不是簡單的高中低或者0-10這些固定的級別。而是相似0-99999這麼多級別。那麼咱們第三種方案將不太合適了。
雖然redis有sorted set這樣的能夠排序的數據類型,看是很惋惜它沒有阻塞版的接口。因而咱們仍是隻能使用list類型經過其餘方式來完成目的。
有個簡單的作法咱們能夠只設置一個隊列,並保證它是按照優先級排序號的。而後經過二分查找法查找一個任務合適的位置,並經過 lset 命令插入到相應的位置。
例如隊列裏面包含着寫優先級的任務[1, 3, 6, 8, 9, 14],當有個優先級爲7的任務過來,咱們經過本身的二分算法一個個從隊列裏面取數據出來反和目標數據比對,計算出相應的位置而後插入到指定地點便可。
由於二分查找是比較快的,而且redis自己也都在內存中,理論上速度是能夠保證的。可是若是說數據量確實很大的話咱們也能夠經過一些方式來調優。
回想咱們第三種方案,把第三種方案結合起來就會很大程度上減小開銷。例如數據量十萬的隊列,它們的優先級也是隨機0-十萬的區間。咱們能夠設置 10個或者100個不一樣的隊列,0-一萬的優先級任務投放到1號隊列,一萬-二萬的任務投放到2號隊列。這樣將一個隊列按不一樣等級拆分後它單個隊列的數據 就減小許多,這樣二分查找匹配的效率也會高一點。可是數據所佔的資源基本是不變的,十萬數據該佔多少內存仍是多少。只是
系統裏面多了一些隊列而已。