1,生產者python
new_task.py函數
import pika if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") message="You are awsome!" for i in range(0,100):#循環100次發送消息 channel.basic_publish(exchange="",routing_key='Kadima',body=message+" "+str(i)) print "sending ",message
2,多個消費者spa
消費者1,work.pycode
#-*- coding: UTF-8 -*- import time import pika import sys __author__ = 'Yue' var=0 def callback(ch, method, properties, body): # temp=var+1 #這裏有趣的是不能寫成var+=1或者var=var+1,要知道爲何,就須要清楚「Python全局變量和局部變量」 #global var #var+=1 #if var==20: #sys.exit() print "1 received %r" % (body,) time.sleep(body.count(".")) print "Done" if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") channel.basic_consume(callback,queue="Kadima",no_ack=True) print ' [1] Waiting for messages' channel.start_consuming()
work2.pyit
import time import pika __author__ = 'Yue' def callback(ch, method, properties, body): print "2 received %r" % (body,) time.sleep(body.count(".")) print "Done" if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") channel.basic_consume(callback,queue="Kadima",no_ack=True) print ' [2] Waiting for messages' channel.start_consuming()
3,執行work,work2,new_taskio
個人啓動順序是work,work2,從執行結果能夠看出,RabbitMQ是將task分別依次分發給按照時間順序註冊的work上的,class
也就是,task1,task2,task3,task4,它會將task1,task3分發給work,另外兩個分發給task3,task4import
接下來,有趣的事情就要發生了:變量
當把work.py中的callback函數的註釋內容打開後(起做用是讓work處理19個task,便退出程序),MQ並無將本該分發給work的task分發給work2,那到底去哪裏了呢?我暫時假設爲work退出時並無告訴MQ他不幹了(他出現異常啦),MQ仍是會將task分發給workobject
4,那沒有執行完的任務怎麼辦呢?
Message acknowledgment :ack默認是打開的
修改work代碼以下
#-*- coding: UTF-8 -*- import time import pika import sys __author__ = 'Yue' var=0 def callback(ch, method, properties, body): print ch, method, properties, body # <pika.adapters.blocking_connection.BlockingChannel object at 0x02973BF0> <Basic. # Deliver(['consumer_tag=ctag1.8b367697d96c4579ba78914d8a4760a8', 'delivery_tag=50 # ', 'exchange=', 'redelivered=False', 'routing_key=Kadima'])> <BasicProperties> Y # ou are awsome! 98 temp=var+1 #這裏有趣的是不能寫成var+=1或者var=var+1,要知道爲何,就須要清楚「Python全局變量和局部變量」 global var var+=1 if var==20: print var , body sys.exit() print "1 received %r" % (body,) # time.sleep(0.1) print "Done" #設置返回ack的標誌,method.delivery_tag是MQ分發給Work時的一個標記 ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") channel.basic_consume(callback,queue="Kadima") print ' [1] Waiting for messages' channel.start_consuming()
work2
def callback(ch, method, properties, body): print "2 received %r" % (body,) # time.sleep(0.1) print "Done" ch.basic_ack(delivery_tag = method.delivery_tag) if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.queue_declare("Kadima") channel.basic_consume(callback,queue="Kadima") print ' [2] Waiting for messages' channel.start_consuming()
因而可知,MQ會從新將沒有執行,或執行失敗的任務從新分發給存活的work2,並且,他的分發順序也頗有趣,是在本來應該分發給work2的task執行結束後再去分發未執行的任務。‘
5,思考,若是在work2中
channel.basic_consume(callback,queue="Kadima",no_ack=True)
會出現什麼狀況。。。