源碼:https://github.com/ltoddy/rabbitmq-tutorialpython
(using the Pika Python client)git
在第一篇教程中,咱們編寫了用於從命名隊列發送和接收消息的程序。在這一個中,咱們將建立一個工做隊列,用於在多個工做人員之間分配耗時的任務。github
工做隊列(又名:任務隊列)背後的主要思想是避免當即執行資源密集型任務,而且必須等待它完成。相反,咱們安排稍後完成任務。咱們將任務封裝 爲消息並將其發送到隊列。
在後臺運行的工做進程將彈出任務並最終執行做業。當你運行許多工人時,任務將在他們之間共享。服務器
這個概念在Web應用程序中特別有用,由於在短的HTTP請求窗口中沒法處理複雜的任務。函數
在本教程的前一部分中,咱們發送了一條包含「Hello World!」的消息。如今咱們將發送表明複雜任務的字符串。
咱們沒有真實世界的任務,好比要調整大小的圖像或要渲染的PDF文件,因此讓咱們僞裝咱們很忙 - 使用 time.sleep()
函數來假裝它。
咱們將把字符串中的點(".")數做爲複雜度; 每個點都會佔用一秒的「工做」。例如,Hello ... 描述的假任務將須要三秒鐘。fetch
咱們稍微修改前面例子中的send.py代碼,以容許從命令行發送任意消息。這個程序將把任務安排到咱們的工做隊列中,因此讓咱們把它命名爲new_task.py:spa
import sys message = ' '.join(sys.argv[1:]) or 'Hello World' channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % message)
咱們的舊版receive.py腳本也須要進行一些更改:它須要爲郵件正文中的每一個點僞造第二個工做。它會從隊列中彈出消息並執行任務,因此咱們稱之爲worker.py:命令行
import time def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done")
使用任務隊列的優勢之一是能夠輕鬆地平行工做。若是咱們正在積累積壓的工做,咱們能夠增長更多的工做人員,而且這種方式很容易擴展。code
首先,咱們試着同時運行兩個worker.py腳本。他們都會從隊列中獲取消息,但具體究竟是什麼?讓咱們來看看。教程
您須要打開三個控制檯。兩個將運行worker.py腳本。這些控制檯將成爲咱們的兩個消費者 - C1和C2。
默認狀況下,RabbitMQ將按順序將每條消息發送給下一個使用者。平均而言,每一個消費者將得到相同數量的消息。這種分配消息的方式稱爲循環法。請嘗試與三名或更多的工人。
作任務可能須要幾秒鐘的時間。你可能想知道若是其中一個消費者開始一項長期任務而且只是部分完成而死亡會發生什麼。
用咱們目前的代碼,一旦RabbitMQ將消息傳遞給客戶,它當即將其標記爲刪除。在這種狀況下,若是你殺了一個工人,咱們將失去剛剛處理的信息。
咱們也會失去全部派發給這個特定工做人員但還沒有處理的消息。
但咱們不想失去任何任務。若是一名工人死亡,咱們但願將任務交付給另外一名工人。
爲了確保消息永不丟失,RabbitMQ支持消息確認。消費者發回ack(請求)告訴RabbitMQ已經收到,處理了特定的消息,而且RabbitMQ能夠自由刪除它。
若是消費者死亡(其通道關閉,鏈接關閉或TCP鏈接丟失),RabbitMQ將理解消息未被徹底處理,並將從新排隊。若是有其餘消費者同時在線,它會迅速將其從新發送給另外一位消費者。
這樣,即便工做人員偶爾死亡,也能夠確保沒有任何信息丟失。
沒有任何消息超時; 當消費者死亡時,RabbitMQ將從新傳遞消息。即便處理消息須要很是很長的時間也不要緊。
消息確認默認是被打開的。在前面的例子中,咱們經過 no_ack = True 標誌明確地將它們關閉。一旦咱們完成了一項任務,如今是時候清除這個標誌而且發送工人的正確確認。
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello')
使用這段代碼,咱們能夠肯定,即便在處理消息時使用CTRL + C來殺死一個工做者,也不會丟失任何東西。工人死後不久,全部未確認的消息將被從新發送。
咱們已經學會了如何確保即便消費者死亡,任務也不會丟失。可是若是RabbitMQ服務器中止,咱們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它會忘記隊列和消息,除非您告訴它不要。須要作兩件事來確保消息不會丟失:咱們須要將隊列和消息標記爲持久。
首先,咱們須要確保RabbitMQ永遠不會失去咱們的隊列。爲了作到這一點,咱們須要宣佈它是持久的:
channel.queue_declare(queue='hello', durable=True)
雖然這個命令自己是正確的,但它在咱們的設置中不起做用。那是由於咱們已經定義了一個名爲hello的隊列 ,這個隊列並不"耐用"。
RabbitMQ不容許您使用不一樣的參數從新定義現有的隊列,並會向任何試圖執行該操做的程序返回錯誤。
可是有一個快速的解決方法 - 讓咱們聲明一個具備不一樣名稱的隊列,例如task_queue:
channel.queue_declare(queue='task_queue', durable=True)
此queue_declare更改須要應用於生產者和消費者代碼。
此時咱們確信,即便RabbitMQ從新啓動,task_queue隊列也不會丟失。如今咱們須要將消息標記爲持久 - 經過提供值爲2的delivery_mode屬性。
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 確保消息是持久的 ))
您可能已經注意到調度仍然沒法徹底按照咱們的要求工做。例如,在有兩名工人的狀況下,當全部奇怪的信息都很重,甚至信息不多時,一名工做人員會一直很忙,
另外一名工做人員幾乎不會作任何工做。那麼,RabbitMQ不知道任何有關這一點,並仍將均勻地發送消息。
發生這種狀況是由於RabbitMQ只在消息進入隊列時調度消息。它沒有考慮消費者未確認消息的數量。它只是盲目地將第n條消息分發給第n位消費者。
爲了解決這個問題,咱們可使用basic.qos方法和設置prefetch_count = 1。這告訴RabbitMQ一次不要向工做人員發送多個消息。
或者換句話說,不要向工做人員發送新消息,直到它處理並確認了前一個消息。相反,它會將其分派給不是仍然忙碌的下一個工做人員。
channel.basic_qos(prefetch_count=1)
咱們的new_task.py腳本的最終代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or 'Hello World' channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 確保消息是持久的 )) print(" [x] Sent %r" % message) connection.close()
而咱們的工人 worker.py:
#!/usr/bin/env python import time import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='hello') channel.basic_qos(prefetch_count=1) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
使用消息確認和prefetch_count,您能夠設置一個工做隊列。即便RabbitMQ從新啓動,持久性選項也可以讓任務繼續存在。