在文章RabbitMQ入門(一)之Hello World,咱們編寫程序經過指定的隊列來發送和接受消息。在本文中,咱們將會建立工做隊列
(Work Queue),經過多個workers來分配耗時任務。
工做隊列(Work Queue,也被成爲Task Queue,任務隊列)的中心思想是,避免當即執行一個資源消耗巨大且必須等待其完成的任務。相反地,咱們調度好隊列能夠安排該任務稍後執行。咱們將一個任務(task)封裝成一個消息,將它發送至隊列。一個在後臺運行的work進程將會拋出該任務,並最終執行該任務。當你運行多個workers的時候,任務將會在它們之中共享。
這個概念在web開發中頗有用,由於經過一個短的HTTP請求不可能處理複雜的任務。
在以前的文章中,咱們發送了一個包含「Hello World!」的消息。如今咱們將會發送表明複雜任務的字串符。咱們並無實際上的任務,好比從新調整圖片的尺寸或者渲染PDF,咱們僞裝有這樣的複雜任務,經過使用time.sleep()
函數。咱們將會用字符串中的點(.)來表明複雜度;每個點表明一秒中的任務。舉例來講,字符串Hello...
須要花費三秒。
咱們須要稍微修改下sent.py
中的代碼,容許在命令中輸入任意字符串。該程序會調度任務至工做隊列,所以命名爲new_task.py
:python
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print(" [x] Sent %r" % message)
咱們原先的receive.py
也須要改動:它須要在消息體中將字符串的每個點表明1秒鐘的任務。它會從隊列中拋出消息並執行該消息,所以命名爲task.py
:mysql
import time def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done")
使用工做隊列的一個好處就是它可以輕鬆實現並行工做。若是咱們建立了一項積壓的工做,那麼咱們能夠增長更多的worker來使它的擴展性更好。
首先,咱們同時運行兩個worker.py
腳本。他們都可以從隊列中獲取消息,可是具體是怎麼實現的呢?讓咱們接着閱讀。
你須要打開三個終端查看。兩個終端用於運行worker.py
腳本。這兩個終端將會成爲兩個消費者——C1和C2。web
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
在第三個終端中,咱們將會產生新的任務。一旦你啓動了這些消費者,你就能夠發送一些消息了:sql
# shell 3 python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....
讓咱們看看這兩個workers傳遞了什麼:shell
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
RabbitMQ默認會將每一個消息依次地發送給下一個消費者。所以總的來講,每一個消費者將會一樣數量的消息。這種消息分配的方法叫Round-Robin
。你能夠嘗試三個或者更多的worker。數據庫
執行一項任務須要花費幾秒鐘。你也許會好奇,若是其中一個消費者執行一項耗時很長的任務,而且在執行了一部分的時候掛掉了,將會發生什麼?根據咱們如今的代碼,一旦RabbitMQ將消息傳送至消費者,那麼RabbitMQ就會標誌它爲刪除狀態。在這種狀況下,若是咱們殺死某個worker,咱們將會失去他正在處理的消息。咱們也會失去全部分配至這個worker的消息,固然,這些消息還未被處理。
可是,咱們不但願失去任何一項任務。若是有一個worker掛掉了,咱們但願這些任務可以被傳送至另外一個worker。
爲了確保消息不丟失,RabbitMQ支持消息確認
。一個ack(nowledgement)是由消費者發送回來的,用於告訴RabbitMQ,這個特定的消息已經被接受,被處理,能夠被刪除了。
若是一個消費者掛了(它的channel關閉了,鏈接關閉了,或者TCP鏈接丟失)可是沒有發送一個ack,RabbitMQ就會知道這個消息並未被徹底處理,會將它從新塞進隊列。若是同時還存在着其餘在線消費者,RabbbitMQ將會將這個消息從新傳送給另外一個消費者。用這種方式能夠確保沒有消息丟失,即便workers偶爾會刮掉。
並不存在消息超時;若是消費者掛了,RabbitMQ將會從新傳送消息。這樣即便處理一個消息須要消耗很長很長的時間,也是能夠的。
默認的消息確認
方式爲人工消息確認
。在咱們以前的例子中,咱們清晰地將它關閉了,使用了auto_ack=True
這個命令。當咱們完成一項任務的時候,根據須要,移除這個標誌,從worker中發送一個合適的確認。json
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep( body.count('.') ) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback)
使用上述代碼,咱們能夠確保,即便咱們使用CTRL+C
命令殺死了一個正在處理消息的woker,也不會丟失什麼。這個worker掛掉後不久,全部未確認的消息將會被從新傳送。
消息確認
必須在同一個傳輸消息的channel中發送。嘗試着在不一樣的channel中進行消息確認將會引起channel-level protocol exception。bash
咱們已經學習瞭如何在消費者掛掉的狀況下,任務不會丟失。可是,當RabbitMQ server中止時,咱們的任務仍然會丟失。
當RabbitMQ中止或崩潰時,它將會忘記全部的隊列和消息,除非你告訴它不這麼作。在這種狀況下,須要作兩個事情確保消息不會丟失:咱們須要將隊列和消息都設置爲持久化。
首先,咱們須要確保RabbitMQ不會丟失隊列。爲了實現這個,咱們須要將隊列聲明爲持久化:app
channel.queue_declare(queue='hello', durable=True)
儘管這個命令是正確的,但他仍會不會起做用。這是由於,咱們已經建立了一個叫爲hello
的非持久化隊列。RabbitMQ不容許你從新定義一個已經存在的隊列而參數不同,全部這樣作的程序只會引起錯誤。可是有一個快速的應變辦法——咱們能夠建立一個不一樣名稱的隊列,好比task_queue
:dom
channel.queue_declare(queue='task_queue', durable=True)
queue_declare
須要同時應用於生產者和消費者。
在這點上咱們能夠確保task_queue
隊列不會丟失消息即便RabbitMQ重啓。如今,咱們須要聲明消息爲持久化——將delivery_mode
這個參數設置爲2。
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
你也許注意到了,剛纔的消息分發機制並不會嚴格地按照咱們所但願的方式進行。舉這樣一個例子,設想有兩個worker,而全部的奇數消息都很重而偶數消息都是輕量級的,這樣其中一個worker就會一直很忙而另外一個worker幾乎不作什麼工做。然而,RabbitMQ對此一無所知,它仍然會平均分配消息。
這種狀況的發生是由於RabbitMQ僅僅是當消息進入隊列的時候就會分發這個消息。它並不會注意消費者所接收的未確認的消息數量。它盲目地將第n個消息發送至第n個消費者。
爲了克服這種狀況,咱們能夠在basic.qos
方法中設置prefetch_count=1
。這就告訴RabbitMQ一次不要將多於一個的消息發送給一個worker。換句話說,不要分發一個新的消息給worker除非這個worker已經處理好以前的消息而且進行了消息確認。也就說,RabbitMQ將會將這個消息分發給下一個不是很忙的worker。
channel.basic_qos(prefetch_count=1)
爲了對上面的例子有一個好的理解,咱們須要寫代碼進行實際操練一下。
生產者new_task.py
的代碼以下:
# -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消費者worker.py
的完整代碼以下:
# -*- coding: utf-8 -*- import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()
開啓三個終端,消息的發送和接收狀況以下:
若是咱們停掉其中一個worker,那麼消息的接收狀況以下:
能夠看到,如今全部發送的消息都會被這個仍在工做的worker接收到。
接下來,咱們將會使用RabbitMQ的這種工做隊列的方式往MySQL數據庫中的表插入數據。
數據庫爲orm_test,表格爲exam_user,表結構以下:
接下來,咱們須要往這張表中插入隨機建立的數據。若是咱們利用Python的第三方模塊pymysql,每一次插入一條記錄,那麼一分鐘插入53237條記錄。
利用RabbitMQ,咱們的生產者代碼以下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:23 import pika from random import choice names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark', 'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David', 'Ella', 'Lake', 'Moon', 'Nake', 'Zoo'] places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao'] types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008', 'DG009', 'DG010', 'DG020'] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for id in range(1, 20000001): name = choice(names) place = choice(places) type2 = choice(types) message = "insert into exam_users values(%s, '%s', '%s', '%s');" % (id, name, place, type2) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
消費者代碼以下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:28 # -*- coding: utf-8 -*- # author: Jclian91 # place: Sanya Hainan # time: 2020-01-12 13:45 import pika import time import pymysql # 打開數據庫鏈接 db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test") # 使用 cursor() 方法建立一個遊標對象 cursor cursor = db.cursor() connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) cursor.execute(body) db.commit() print(" [x] Insert successfully!") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()
咱們開啓9個終端,其中8個消費者1個生產者,先啓動消費者,而後生產者,按照上面的數據導入方式,一分鐘插入了133084條記錄,是普通方式的2.50倍,效率有大幅度提高!
讓咱們稍微修改下生產者和消費者的代碼,一次提交插入多條記錄,減小每提交一次就插入一條記錄的消耗時間。新的生產者代碼以下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:23 import pika from random import choice import json names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark', 'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David', 'Ella', 'Lake', 'Moon', 'Nake', 'Zoo'] places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao'] types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008', 'DG009', 'DG010', 'DG020'] connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) for _ in range(1, 200001): values = [] for i in range(100): name = choice(names) place = choice(places) type2 = choice(types) values.append([100*_+i+1, name, place, type2]) message = json.dumps(values) channel.basic_publish( exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) print(" [x] Sent %r" % message) connection.close()
新的消費者的代碼以下:
# -*- coding: utf-8 -*- # author: Jclian91 # place: Pudong Shanghai # time: 2020-01-13 23:28 # -*- coding: utf-8 -*- # author: Jclian91 # place: Sanya Hainan # time: 2020-01-12 13:45 import pika import json import time import pymysql # 打開數據庫鏈接 db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test") # 使用 cursor() 方法建立一個遊標對象 cursor cursor = db.cursor() connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] Received %r" % body) sql = 'insert into exam_users values(%s, %s, %s, %s)' cursor.executemany(sql, json.loads(body)) db.commit() print(" [x] Insert successfully!") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(queue='task_queue', on_message_callback=callback) channel.start_consuming()
跟剛纔同樣,咱們開啓9個終端,其中8個消費者1個生產者,先啓動消費者,而後生產者,按照上面的數據導入方式,一分鐘插入了3170600條記錄,是普通方式的59.56倍,是先前一次只提交一條記錄的插入方式的23.82倍。這樣的提速無疑是很是驚人的!
固然還有更高效的數據插入方法,本文的方法僅僅是爲了演示RabbitMQ的工做隊列以及在插入數據方面的提速。
本次分享到此結束,感謝你們閱讀~