【RabbitMQ 服務器】
html
# 在 vhosttest 裏面有 exchangetest 和 queuetest 經過 rkeytest 綁定 Broker: 192.168.0.xx virtual host: vhosttest Exchange: exchangetest Queue: queuetest Routing key: rkeytest
【Python 環境】node
OS: Windows 10 Python: 3.6.3 x64 pika: 0.11.2
【查看隊列狀態】python
# 經過瀏覽器查看隊列狀態 http://192.168.0.xx:15672/api/queues/vhosttest/queuetest # 經過命令行查看隊列狀態 curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | jq # 經過命令行查看隊列長度(messages = messages_ready + messages_unacknowledged) curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | \ jq '.messages'
【send.py】git
#encoding: utf-8 #author: walker #date: 2018-01-31 #summary: 發送方/生產者 import os, sys, time import pika def Main(): credentials = pika.PlainCredentials("test", "test") parameters = pika.ConnectionParameters(host="192.168.0.xx", virtual_host='vhosttest', credentials=credentials) connection = pika.BlockingConnection(parameters) # 鏈接 RabbitMQ channel = connection.channel() # 建立頻道 queue = channel.queue_declare(queue='queuetest') # 聲明或建立隊列 while True: # 循環向隊列中發送信息 message = time.strftime('%H:%M:%S', time.localtime()) channel.basic_publish(exchange='exchangetest', routing_key='rkeytest', body=message) print('send message: %s' % message) while True: # 檢查隊列,以從新獲得消息計數 queue = channel.queue_declare(queue='queuetest', passive=True) ''' queue.method.message_count 獲取的爲 ready 的消息數 截至 2018-03-06(pika 0.11.2) walker 沒找到利用 pika 獲取 unack 或者 total 消息數的方法 ''' messageCount = queue.method.message_count print('messageCount: %d' % messageCount) if messageCount < 100: break connection.sleep(1) # 關閉鏈接 connection.close() if __name__ == '__main__': Main()
【recv.py - 版本1】github
一個消費者api
#encoding: utf-8 #author: walker #date: 2018-01-31 #summary: 接收方/消費者 import os, sys, time import pika # 接收處理消息的回調函數 def ConsumerCallback (channel, method, properties, body): print("Received %s" % body) def Main(): credentials = pika.PlainCredentials("test", "test") parameters = pika.ConnectionParameters(host="192.168.0.xx", virtual_host='vhosttest', credentials=credentials) connection = pika.BlockingConnection(parameters) # 鏈接 RabbitMQ channel = connection.channel() # 建立頻道 queue = channel.queue_declare(queue='queuetest') # 聲明或建立隊列 # no_ack=True 開啓自動確認,否則消費後的消息會一直留在隊列裏面 # no_ack = no_manual_ack = auto_ack;不手動應答,開啓自動應答模式 channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True) print('Wait Message ...') channel.start_consuming() if __name__ == '__main__': Main()
【recv.py - 版本2】瀏覽器
利用多線程實現多個消費者同時消費安全
#encoding: utf-8 #author: walker #date: 2018-03-9 #summary: 接收方/消費者 import os, sys, time import pika import threading from queue import Queue GlobalQueue = Queue(10) class Consumer(threading.Thread): def run(self): while True: task = GlobalQueue.get() print('thread-%d,\ttask: %s' % (threading.get_ident(), task)) # 接收處理消息的回調函數 def ConsumerCallback (channel, method, properties, body): # 將消息推入隊列 GlobalQueue.put(body) def Main(): credentials = pika.PlainCredentials("test", "test") parameters = pika.ConnectionParameters(host="192.168.0.86", virtual_host='vhosttest', credentials=credentials) connection = pika.BlockingConnection(parameters) # 鏈接 RabbitMQ channel = connection.channel() # 建立頻道 channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True) # 公平消費 # no_ack=True 開啓自動確認,否則消費後的消息會一直留在隊列裏面 # no_ack = no_manual_ack = auto_ack;不手動應答,開啓自動應答模式 channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True) print('Wait Message ...') for i in range(3): c = Consumer() c.start() channel.start_consuming() # 開始接收任務 if __name__ == '__main__': Main()
【後記】bash
若是但願不經過 exchange 路由轉發,直接給隊列發送消息,能夠將 exchange 設爲空字符串,routing_key 設爲隊列名。服務器
channel.basic_publish(exchange='', routing_key='queuetest', body=task)
有時候 basic_get 比 basic_consume 更方便。
【0.x 到 1.x 的遷移】
pika.ConnectionParameters
# 0.x 版本 pika.ConnectionParameters(host=Host, virtual_host=VirtualHost, credentials=pika.PlainCredentials(User, Pwd), heartbeat_interval=0) # 1.x 版本 pika.ConnectionParameters(host=MQHost, virtual_host=MQVirtualHost, credentials=credentials, heartbeat=0)
channel.basic_qos
# 0.x 版本 channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True) # 1.x 版本 channel.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=True)
channel.basic_consume,終於換掉了坑爹的 no_ack 命名
# 0.x 版本 # no_ack = no_manual_ack = auto_ack;不手動應答,開啓自動應答模式 channel.basic_consume(consumer_callback=ConsumerCallback, queue=MQQueueNode2Center, no_ack=False) # 1.x 版本 channel.basic_consume(queue=MQQueueNode2Center, on_message_callback=ConsumerCallback, auto_ack=False)
【相關閱讀】
pika 並不是線程安全:Is Pika thread safe?(FAQ)
*** walker ***