【RabbitMQ 服務器】python
# 在 vhosttest 裏面有 exchangetest 和 queuetest 經過 rkeytest 綁定 Broker: 192.168.0.xx virtual host: vhosttest Exchange: exchangetest Queue: queuetest Routing key: rkeytest
【Python 環境】api
OS: Windows 10 Python: 3.6.3 x64 kombu: 4.1.0
【查看隊列狀態】瀏覽器
# 經過瀏覽器查看隊列狀態 http://192.168.0.xx:15672/api/queues/vhosttest/queuetest # 經過命令行查看隊列狀態curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | jq # 經過命令行查看隊列長度(messages = messages_ready + messages_unacknowledged) curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest | \ jq '.messages'
【send.py】
服務器
#encoding: utf-8 #author: walker #date: 2018-03-09 #summary: 發送方/生產者 import os, sys, time from kombu import Connection def Main(): with Connection('amqp://test:test@192.168.0.xx:5672/vhosttest') as conn: with conn.channel() as channel: #producer = Producer(channel) producer = channel.Producer() while True: message = time.strftime('%H:%M:%S', time.localtime()) producer.publish( body=message, retry=True, exchange='exchangetest', routing_key='rkeytest' ) print('send message: %s' % message) while True: # 檢查隊列,以從新獲得消息計數 queue = channel.queue_declare(queue='queuetest', passive=True) messageCount = queue.message_count print('messageCount: %d' % messageCount) if messageCount < 100: break time.sleep(1) if __name__ == '__main__': Main()
【recv.py】curl
#encoding: utf-8 #author: walker #date: 2018-03-09 #summary: 接收方/消費者 import os, sys, time from kombu import Connection, Queue from kombu.mixins import ConsumerMixin class C(ConsumerMixin): def __init__(self, connection, queueNmae): self.connection = connection self.queues = [Queue(queueNmae, durable=False)] def get_consumers(self, Consumer, channel): return [ Consumer(self.queues, callbacks=[self.on_message]), ] # 接收處理消息的回調函數 def on_message(self, body, message): print("Received %s" % body) message.ack() def Main(): with Connection('amqp://test:test@192.168.0.xx:5672/vhosttest') as conn: C(conn, 'queuetest').run() if __name__ == '__main__': Main()
【相關閱讀】ide
*** walker ***