上一篇文章結尾我留了一個問題,就是在work2中設no_ack=Truepython
出現那樣的結果是由於server一次分發多個結果給work2,work2又在執行完了之後沒有發送ack確認執行結束,server端是根據connection是否存在來判斷work2是否stoped,因爲鏈接存在,因此server認爲work2正在執行,其實此時work2已經執行結束,正在等待server發送消息。。。fetch
一:spa
task是被加載到內存中的,要避免server崩潰致使的task丟失,固然想到的辦法就是持久化,將task保存到硬盤上code
在定義Queue的時候,設置屬性durable=Trueorm
channel.queue_declare(=,=)
與此同時須要告知生產者,queue是被持久化在硬盤上的server
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
二:內存
如何解決server一次分發多個任務的狀況,也能夠經過設置屬性it
channel.basic_qos(prefetch_count=1)
new_task.py:io
# -*- coding: UTF-8 -*- import pika if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.queue_declare(queue="Kadima", durable=True) message = "You are awsome!" for i in range(0, 100): # 循環100次發送消息 channel.basic_publish(exchange="", routing_key='Kadima', body=message + " " + str(i), properties=pika.BasicProperties(delivery_mode=2)) print "sending ", message
work代碼:class
#-*- coding: UTF-8 -*- import time import pika import sys __author__ = 'Yue' var=0 def callback(ch, method, properties, body): # <pika.adapters.blocking_connection.BlockingChannel object at 0x02973BF0> <Basic. # Deliver(['consumer_tag=ctag1.8b367697d96c4579ba78914d8a4760a8', 'delivery_tag=50 # ', 'exchange=', 'redelivered=False', 'routing_key=Kadima'])> <BasicProperties> Y # ou are awsome! 98 # temp=var+1 #這裏有趣的是不能寫成var+=1或者var=var+1,要知道爲何,就須要清楚「Python全局變量和局部變量」 # global var # var+=1 # if var==20: # print var , body # sys.exit() print "1 received %r" % (body,) # time.sleep(0.1) print "Done" #設置返回ack的標誌,method.delivery_tag是MQ分發給Work時的一個標記 ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.basic_qos(prefetch_count=1) channel.queue_declare(queue="Kadima",durable=True) channel.basic_consume(callback,queue="Kadima") print ' [1] Waiting for messages' channel.start_consuming()
work2.py
import time import pika __author__ = 'Yue' def callback(ch, method, properties, body): print "2 received %r" % (body,) # time.sleep(0.1) print "Done" ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.basic_qos(prefetch_count=1) channel.queue_declare(queue="Kadima",durable=True) channel.basic_consume(callback,queue="Kadima") print ' [2] Waiting for messages' channel.start_consuming()
3運行結果
能夠看出server對任務的分發已經變得隨機,而不是原來的依次分發