RabbitMQ入門(二)工做隊列

  在文章RabbitMQ入門(一)之Hello World,咱們編寫程序經過指定的隊列來發送和接受消息。在本文中,咱們將會建立工做隊列(Work Queue),經過多個workers來分配耗時任務。
  工做隊列(Work Queue,也被成爲Task Queue,任務隊列)的中心思想是,避免當即執行一個資源消耗巨大且必須等待其完成的任務。相反地,咱們調度好隊列能夠安排該任務稍後執行。咱們將一個任務(task)封裝成一個消息,將它發送至隊列。一個在後臺運行的work進程將會拋出該任務,並最終執行該任務。當你運行多個workers的時候,任務將會在它們之中共享。
  這個概念在web開發中頗有用,由於經過一個短的HTTP請求不可能處理複雜的任務。
  在以前的文章中,咱們發送了一個包含「Hello World!」的消息。如今咱們將會發送表明複雜任務的字串符。咱們並無實際上的任務,好比從新調整圖片的尺寸或者渲染PDF,咱們僞裝有這樣的複雜任務,經過使用time.sleep()函數。咱們將會用字符串中的點(.)來表明複雜度;每個點表明一秒中的任務。舉例來講,字符串Hello...須要花費三秒。
  咱們須要稍微修改下sent.py中的代碼,容許在命令中輸入任意字符串。該程序會調度任務至工做隊列,所以命名爲new_task.pypython

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % message)

  咱們原先的receive.py也須要改動:它須要在消息體中將字符串的每個點表明1秒鐘的任務。它會從隊列中拋出消息並執行該消息,所以命名爲task.pymysql

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

Round-Robin分發(輪詢分發)

  使用工做隊列的一個好處就是它可以輕鬆實現並行工做。若是咱們建立了一項積壓的工做,那麼咱們能夠增長更多的worker來使它的擴展性更好。
  首先,咱們同時運行兩個worker.py腳本。他們都可以從隊列中獲取消息,可是具體是怎麼實現的呢?讓咱們接着閱讀。
  你須要打開三個終端查看。兩個終端用於運行worker.py腳本。這兩個終端將會成爲兩個消費者——C1和C2。web

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

在第三個終端中,咱們將會產生新的任務。一旦你啓動了這些消費者,你就能夠發送一些消息了:sql

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

讓咱們看看這兩個workers傳遞了什麼:shell

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

RabbitMQ默認會將每一個消息依次地發送給下一個消費者。所以總的來講,每一個消費者將會一樣數量的消息。這種消息分配的方法叫Round-Robin。你能夠嘗試三個或者更多的worker。數據庫

消息確認(Message Acknowledgement)

  執行一項任務須要花費幾秒鐘。你也許會好奇,若是其中一個消費者執行一項耗時很長的任務,而且在執行了一部分的時候掛掉了,將會發生什麼?根據咱們如今的代碼,一旦RabbitMQ將消息傳送至消費者,那麼RabbitMQ就會標誌它爲刪除狀態。在這種狀況下,若是咱們殺死某個worker,咱們將會失去他正在處理的消息。咱們也會失去全部分配至這個worker的消息,固然,這些消息還未被處理。
  可是,咱們不但願失去任何一項任務。若是有一個worker掛掉了,咱們但願這些任務可以被傳送至另外一個worker。
  爲了確保消息不丟失,RabbitMQ支持消息確認。一個ack(nowledgement)是由消費者發送回來的,用於告訴RabbitMQ,這個特定的消息已經被接受,被處理,能夠被刪除了。
  若是一個消費者掛了(它的channel關閉了,鏈接關閉了,或者TCP鏈接丟失)可是沒有發送一個ack,RabbitMQ就會知道這個消息並未被徹底處理,會將它從新塞進隊列。若是同時還存在着其餘在線消費者,RabbbitMQ將會將這個消息從新傳送給另外一個消費者。用這種方式能夠確保沒有消息丟失,即便workers偶爾會刮掉。
  並不存在消息超時;若是消費者掛了,RabbitMQ將會從新傳送消息。這樣即便處理一個消息須要消耗很長很長的時間,也是能夠的。
  默認的消息確認方式爲人工消息確認。在咱們以前的例子中,咱們清晰地將它關閉了,使用了auto_ack=True這個命令。當咱們完成一項任務的時候,根據須要,移除這個標誌,從worker中發送一個合適的確認。json

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep( body.count('.') )
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello', on_message_callback=callback)

使用上述代碼,咱們能夠確保,即便咱們使用CTRL+C命令殺死了一個正在處理消息的woker,也不會丟失什麼。這個worker掛掉後不久,全部未確認的消息將會被從新傳送。
  消息確認必須在同一個傳輸消息的channel中發送。嘗試着在不一樣的channel中進行消息確認將會引起channel-level protocol exception。bash

消息持久化(Message Durability)

  咱們已經學習瞭如何在消費者掛掉的狀況下,任務不會丟失。可是,當RabbitMQ server中止時,咱們的任務仍然會丟失。
  當RabbitMQ中止或崩潰時,它將會忘記全部的隊列和消息,除非你告訴它不這麼作。在這種狀況下,須要作兩個事情確保消息不會丟失:咱們須要將隊列和消息都設置爲持久化。
  首先,咱們須要確保RabbitMQ不會丟失隊列。爲了實現這個,咱們須要將隊列聲明爲持久化:app

channel.queue_declare(queue='hello', durable=True)

儘管這個命令是正確的,但他仍會不會起做用。這是由於,咱們已經建立了一個叫爲hello的非持久化隊列。RabbitMQ不容許你從新定義一個已經存在的隊列而參數不同,全部這樣作的程序只會引起錯誤。可是有一個快速的應變辦法——咱們能夠建立一個不一樣名稱的隊列,好比task_queuedom

channel.queue_declare(queue='task_queue', durable=True)

queue_declare須要同時應用於生產者和消費者。
  在這點上咱們能夠確保task_queue隊列不會丟失消息即便RabbitMQ重啓。如今,咱們須要聲明消息爲持久化——將delivery_mode這個參數設置爲2。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

公平分發(Fair Dispatch)

  你也許注意到了,剛纔的消息分發機制並不會嚴格地按照咱們所但願的方式進行。舉這樣一個例子,設想有兩個worker,而全部的奇數消息都很重而偶數消息都是輕量級的,這樣其中一個worker就會一直很忙而另外一個worker幾乎不作什麼工做。然而,RabbitMQ對此一無所知,它仍然會平均分配消息。
  這種狀況的發生是由於RabbitMQ僅僅是當消息進入隊列的時候就會分發這個消息。它並不會注意消費者所接收的未確認的消息數量。它盲目地將第n個消息發送至第n個消費者。

  爲了克服這種狀況,咱們能夠在basic.qos方法中設置prefetch_count=1。這就告訴RabbitMQ一次不要將多於一個的消息發送給一個worker。換句話說,不要分發一個新的消息給worker除非這個worker已經處理好以前的消息而且進行了消息確認。也就說,RabbitMQ將會將這個消息分發給下一個不是很忙的worker。

channel.basic_qos(prefetch_count=1)

實戰1

  爲了對上面的例子有一個好的理解,咱們須要寫代碼進行實際操練一下。
  生產者new_task.py的代碼以下:

# -*- coding: utf-8 -*-

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent
    ))
print(" [x] Sent %r" % message)
connection.close()

  消費者worker.py的完整代碼以下:

# -*- coding: utf-8 -*-

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

  開啓三個終端,消息的發送和接收狀況以下:
message發送與接收狀況
  若是咱們停掉其中一個worker,那麼消息的接收狀況以下:
其中一個worker掛掉
能夠看到,如今全部發送的消息都會被這個仍在工做的worker接收到。

實戰2

  接下來,咱們將會使用RabbitMQ的這種工做隊列的方式往MySQL數據庫中的表插入數據。
  數據庫爲orm_test,表格爲exam_user,表結構以下:

exam_user數據庫表結構

  接下來,咱們須要往這張表中插入隨機建立的數據。若是咱們利用Python的第三方模塊pymysql,每一次插入一條記錄,那麼一分鐘插入53237條記錄。
  利用RabbitMQ,咱們的生產者代碼以下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:23
import pika
from random import choice

names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark',
         'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David',
         'Ella', 'Lake', 'Moon', 'Nake', 'Zoo']
places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao']
types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008',
         'DG009', 'DG010', 'DG020']


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for id in range(1, 20000001):
    name = choice(names)
    place = choice(places)
    type2 = choice(types)
    message = "insert into exam_users values(%s, '%s', '%s', '%s');" % (id, name, place, type2)

    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [x] Sent %r" % message)
connection.close()

  消費者代碼以下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:28
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Sanya Hainan
# time: 2020-01-12 13:45
import pika
import time
import pymysql

# 打開數據庫鏈接
db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test")

# 使用 cursor() 方法建立一個遊標對象 cursor
cursor = db.cursor()

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    cursor.execute(body)
    db.commit()
    print(" [x] Insert successfully!")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

咱們開啓9個終端,其中8個消費者1個生產者,先啓動消費者,而後生產者,按照上面的數據導入方式,一分鐘插入了133084條記錄,是普通方式的2.50倍,效率有大幅度提高!
  讓咱們稍微修改下生產者和消費者的代碼,一次提交插入多條記錄,減小每提交一次就插入一條記錄的消耗時間。新的生產者代碼以下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:23
import pika
from random import choice
import json

names = ['Jack', 'Rose', 'Mark', 'Hill', 'Docker', 'Lilei', 'Lee', 'Bruce', 'Dark',
         'Super', 'Cell', 'Fail', 'Suceess', 'Su', 'Alex', 'Bob', 'Cook', 'David',
         'Ella', 'Lake', 'Moon', 'Nake', 'Zoo']
places = ['Beijing', 'Shanghai', 'Guangzhou', 'Dalian', 'Qingdao']
types = ['DG001', 'DG002', 'DG003', 'DG004', 'DG005', 'DG006', 'DG007', 'DG008',
         'DG009', 'DG010', 'DG020']


connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

for _ in range(1, 200001):

    values = []
    for i in range(100):
        name = choice(names)
        place = choice(places)
        type2 = choice(types)
        values.append([100*_+i+1, name, place, type2])
    message = json.dumps(values)


    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # make message persistent
        ))
    print(" [x] Sent %r" % message)

connection.close()

  新的消費者的代碼以下:

# -*- coding: utf-8 -*-
# author: Jclian91
# place: Pudong Shanghai
# time: 2020-01-13 23:28
# -*- coding: utf-8 -*-
# author: Jclian91
# place: Sanya Hainan
# time: 2020-01-12 13:45
import pika
import json
import time
import pymysql

# 打開數據庫鏈接
db = pymysql.connect(host="localhost", port=3306, user="root", password="", db="orm_test")

# 使用 cursor() 方法建立一個遊標對象 cursor
cursor = db.cursor()

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    sql = 'insert into exam_users values(%s, %s, %s, %s)'
    cursor.executemany(sql, json.loads(body))
    db.commit()
    print(" [x] Insert successfully!")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

跟剛纔同樣,咱們開啓9個終端,其中8個消費者1個生產者,先啓動消費者,而後生產者,按照上面的數據導入方式,一分鐘插入了3170600條記錄,是普通方式的59.56倍,是先前一次只提交一條記錄的插入方式的23.82倍。這樣的提速無疑是很是驚人的!
  固然還有更高效的數據插入方法,本文的方法僅僅是爲了演示RabbitMQ的工做隊列以及在插入數據方面的提速。

  本次分享到此結束,感謝你們閱讀~

相關文章
相關標籤/搜索