在處理Web客戶端發送的命令請求時,某些操做的執行時間可能會比咱們預期的更長一些。經過將待執行任務的相關信息放入隊列裏面,並在以後對隊列進行處理,用戶能夠推遲那些須要一段時間才能完成的操做,這種工做交給任務處理器來執行的作法被稱爲任務隊列(task queue)
。如今有不少專門的任務隊列軟件(如ActiveMQ、RabbitMQ、Gearman、Amazon SQS),接下來實現兩種不一樣類型的任務隊列,第一種隊列會根據任務被插入隊列的順序來儘快地執行任務,第二種隊列具備安排任務在將來某個特定時間執行的能力。json
除了任務隊列以外,還有先進先出(FIFO)隊列、後進後出(LIFO)隊列和優先級(priority)隊列。
使用任務隊列來記錄郵件的收信人以及發送郵件的緣由,並構建一個能夠在郵件發送服務器運行變得緩慢的時候,以並行方式一次發送多封郵件的工做進程(worker process)。安全
要編寫的隊列將以「先到先服務」(first-come,first-served)的方式發送郵件,而且不管發送是否成功,程序都會把發送結果記錄到日誌裏面。Redis的列表結構容許用戶經過RPUSH和LPUSH以及RPOP和LPOP,從列表的兩端推入和彈出元素。郵件隊列使用RPUSH命令來將待發送的郵件推入列表的右端,而且由於工做進程除了發送郵件以外不須要執行其餘工做,因此它將使用阻塞版本的彈出命令BLPOP從隊列中彈出待發送的郵件,而命令的最大阻塞時限爲30秒。服務器
郵件隊列由一個Redis列表構成,包含多個JSON編碼對象。爲了將待發送的郵件推入隊列裏面,程序會獲取發送郵件所需的所有信息,並將這些信息序列化爲JSON對象,最後使用RPUSH命令將JSON對象推入郵件隊列裏面。網絡
def send_sold_email_via_queue(conn, seller, item, price, buyer): data = { 'seller_id': seller, 'item_id': item, 'price': price, 'buyer_id': buyer, 'time': time.time() } conn.rpush('queue:email', json.dumps(data))
從隊列裏獲取待發送郵件,程序首先使用BLPOP命令從郵件隊列裏面彈出一個JSON對象,接着經過解碼JSON對象來取得待發送郵件的相關信息,最後根據這些信息來發送郵件。ide
def process_sold_email_queue(conn): while not QUIT: packed = conn.blpop(['queue:email'], 30) //獲取一封待發送郵件 if not packed: //隊列裏面暫時尚未待發送郵件,重試 continue to_send = json.loads(packed[1]) //從JSON對象中解碼出郵件信息 try: fetch_data_and_send_sold_email(to_send) except EmailSendError as err: log_error("Failed to send sold email", err, to_send) else: log_success("Send sold email", to_send)
由於BLPOP命令每次只會從隊列裏面彈出一封待發送郵件,因此待發送郵件不會出現重複,也不會被重複發送。而且由於隊列只會存放待發送郵件,因此工做進程要處理的任務是很是單一的。下面代碼的工做進程會監視用戶提供的多個隊列,並從多個已知的已註冊回調函數裏面,選出一個函數來處理JSON編碼的函數調用。函數
def worker_watch_queue(conn, queue, callback): while not QUIT: packed = conn.blpop([queue], 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: //沒有找到任務指定的回調函數,用日誌記錄錯誤並重試 log_error("Unknown callback %s"%name) continue callbacks[name](*args) //執行任務
在使用隊列的時候,程序可能會須要讓特定的操做優先於其餘操做執行。fetch
假設如今咱們須要爲任務設置高、中、低3種優先級別,其中:高優先級任務在出現以後會第一時間被執行,而中等優先級任務則會在沒有任何高優先級任務存在的狀況下被執行,而低優先級任務則會在既沒有任何高優先級任務,又沒有任何中等優先級任務的狀況下被執行。ui
def worker_watch_queues(conn, queues, callbacks): while not QUIT: packed = conn.blpop(queues, 30) if not packed: continue name, args = json.loads(packed[1]) if name not in callbacks: log_error("Unknown callback %s"%name) continue callbacks[name](*args)
同時使用多個隊列能夠下降實現優先級特性的難度。除此以外,多隊列有時候也會被用於分隔不一樣的任務(如同一個隊列存放公告郵件,而另外一個隊列則存放提醒郵件),在這種狀況下,處理不一樣隊列時可能出現不公平現象。爲此,咱們能夠偶爾從新排列各個隊列的順序,使得針對隊列的處理操做變得更公平一些,當某個隊列的增加速度比其餘隊列的增加速度快的時候,這種重拍操做尤其重要。編碼
使用列表結構能夠實現只能執行一種任務的隊列,也能夠實現經過調用不一樣回調函數來執行不一樣任務的隊列,甚至還能夠實現簡單的優先級隊列。
如下3種方法能夠爲隊列中的任務添加延遲性質:日誌
由於不管是進行短暫的等待,仍是將任務從新推入隊列裏面,都會浪費工做進程的時間,因此咱們不會採用第一種方法。此外,由於工做進程可能會由於崩潰而丟失本地記錄的全部待執行任務,因此咱們也不會採用第二種方法。最後,由於使用有序集合的第三種方法最簡單直接,因此咱們將採起這一方法,並使用鎖來保證任務從有序集合移動到任務隊列時的安全性。
有序集合隊列(ZSET queue)存儲的每一個被延遲的任務都是一個包含4個值的JSON列表,這4個分值分別是:惟一標識符、處理任務隊列的名字、處理任務的回調函數的名字、傳給回調函數的參數。在有序集合裏面,任務的分值會被設置爲任務的執行時間,而當即可執行的任務將被直接插入任務隊列裏面。下面代碼展現了建立延遲任務(任務是否延遲是可選的,只要把任務的延遲時間設置爲0就能夠建立一個當即執行的任務)。
def execute_later(conn, queue, name, args, delay=0): identifier = str(uuid.uuid4()) item = json.dumps([identifier, queue, name, args]) if delay > 0: conn.zadd('delayed:', item, time.time() + delay) else: conn.rpush('queue:' + queue, item) return identifier
由於Redis沒有提供直接的方法能夠阻塞有序集合直到元素的分值低於當前UNIX時間戳爲止,因此咱們須要本身來查找有序集合裏面分值低於當前UNIX時間戳的任務。由於全部被延遲的任務都存儲在同一個有序集合隊列裏面,因此程序只須要獲取有序集合裏面排名第一的元素以及該元素的分值就能夠了:若是隊列裏面沒有任何任務,或者任務的執行時間還沒有來臨,那麼程序將在短暫等待以後重試;若是任務的執行時間已到,那麼程序將根據任務包含的標識符來獲取一個細粒度鎖,接着從有序集合裏面移除要被執行的任務,並將它添加到適當的任務隊列裏面。經過將可執行的任務添加到任務隊列裏面而不是直接執行它們,咱們能夠把獲取可執行任務的進程數量限制在一兩個以內,而沒必要根據工做進程的數量來決定運行多少個獲取進程,這減小了獲取可執行任務所需的花銷。
def poll_queue(conn): while not QUIT: item = conn.zrange('delayed:', 0, 0, withscores=True) if not item or item[0][1] > time.time(): time.sleep(.01) continue item = item[0][0] identifier, queue, function, args = json.loads(item) locked = acquire_lock(conn, identifier) if not locked: continue if conn.zrem('delayed:', item): conn.rpush('queue:' + queue, item) release_lock(conn, identifier, locked)
由於有序集合並不具有像列表那樣的阻塞彈出機制,因此程序須要不斷地進行循環,並嘗試從隊列裏面獲取要被執行的任務,雖然這一操做會增大網絡和處理器的負載,但由於咱們只會運行一兩個這樣的程序,因此不會消耗太多資源。