更多詳情參考官方文檔:https://www.rabbitmq.com/tutorials/tutorial-six-python.htmlhtml
參考博客:http://www.javashuo.com/article/p-zeofyjvm-ez.htmlpython
微服務通訊RPCjson
01-HelloWorld(簡單的消息隊列)服務器
send.py app
import pika #與RabbitMQ服務器創建鏈接 credential = pika.PlainCredentials('yang','abc123456')#erase_on_connect是否清楚憑證 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',credentials=credential)) #建立頻道 channel = connection.channel()#channel_number參數指定頻道編號,建議默認pika自行管理 #建立頻道傳遞消息的隊列 channel.queue_declare(queue='hello') #向頻道隊列中發送消息 channel.basic_publish(exchange='', routing_key='hello', body='hello world!') print('[x]消息發送成功!') connection.close()#斷開鏈接
receive.pydom
import pika credentials = pika.PlainCredentials('yang','abc123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',port=5672,credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(f'{body.decode()}') channel.basic_consume(queue='hello', auto_ack=True,#開啓自動確認關閉手動確認 on_message_callback=callback) print(' [*] Waiting for messages. ') channel.start_consuming()
02-WorkQueues(任務隊列)ide
new_task.py 微服務
''' 任務生產者將任務發送到指定隊列中,多個工做者進行均分協同處理(啓多個工做者) ''' import pika credentials = pika.PlainCredentials('yang','abc123456') connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='work_queue',durable=True)#durable=True指定消息持久化,出現異常不會丟失。注意basic_publish須要設置參數 for i in range(1000): message = f'new_task{i}...' channel.basic_publish( exchange='', routing_key='work_queue', body=message, properties = pika.BasicProperties(delivery_mode=2, )# 支持數據持久化:2表明消息是持久 ) print(f'Send>>>{message}') connection.close()
worker.py測試
import time import random import pika credentials = pika.PlainCredentials('yang','abc123456') connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='work_queue',durable=True)#durable=True指定消息持久化,出現異常不會丟失 def callback(ch, method, properties, body): print(f'Rceive>>{body}') time.sleep(random.random()) print(f'Done--{body.decode()}') # 手動確認機制(在消費者掛掉沒有給確認時,消息不會丟失) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1)#此設置確保在沒有進行確認以前當前接受得任務只有1個(不設置默認是平均分配) channel.basic_consume( queue='work_queue', on_message_callback=callback ) channel.start_consuming()
03-PublishSubcribe(訂閱發佈)fetch
publish.py
''' 發佈者發佈一條任務,經過交換機發送到與此交換機簡歷鏈接的全部隊列中進行共享(啓多個訂閱者) ''' import pika credentials = pika.PlainCredentials('yang','abc123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials)) channel = connection.channel() #建立頻道的交換器(交換器負責將任務發送到鏈接詞交換器的全部的隊列中) channel.exchange_declare(exchange='logs', exchange_type='fanout',#建立一個fanout(廣播)類型的交換機exchange,名字爲logs durable=True)#durabl持久化 for i in range(1000): message = f'new_task{i}...' channel.basic_publish( exchange='logs',#指定交換機 routing_key='',#無需指定路由鍵隊列,由交換機進行發送 body=message, properties = pika.BasicProperties(delivery_mode=2, )# delivery_mode支持數據持久化:2表明消息是持久 ) print(f'Send>>>{message}') connection.close()
subcribe.py
import pika credentials = pika.PlainCredentials('yang','abc123456') connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials)) channel = connection.channel() #與交換器創建鏈接 channel.exchange_declare(exchange='logs',exchange_type='fanout',durable=True)#durable持久化 #肯定消息隊列(不指定隊列名,每一個訂閱者rabbirmq服務器鏈接都會建立一個隨機隊列) result= channel.queue_declare(queue='',exclusive=True,durable=True)#exclusive=True當斷開鏈接時,隊列銷燬(持久化沒有用,設置了斷開銷燬) queue_name= result.method.queue #與交換機新建的的隨機隊列進行綁定 channel.queue_bind(exchange='logs',queue=queue_name) def callback(ch, method, properties, body): print(f'Rceive>>{body}') print(f'Done--{body.decode()}') # 手動確認機制(在消費者掛掉沒有給確認時,消息不會丟失) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume( queue=queue_name, on_message_callback=callback ) channel.start_consuming()
06-RPC(遠程過程調用)
rpc_client.py
import pika import uuid class RpcClient(object): def __init__(self): credentials = pika.PlainCredentials('yang', 'abc123456') self.connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, credentials=credentials)) self.channel = self.connection.channel() # 隨機建立一個惟一隊列 result = self.channel.queue_declare(queue='', exclusive=True) self.callback_queue = result.method.queue # 接收回調隊列中的消息 self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, m, n): self.response = None self.corr_id = str(uuid.uuid4()) # 想通訊隊列放入消息 self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( delivery_mode=2,#持久化參數 # content_type='application/json',#指定body數據類型,不指定則爲字符串(有待測試) reply_to=self.callback_queue, # 指定回調隊列 correlation_id=self.corr_id, # 指定本次請求標識id ), body=str(m) + ',' + str(n) ) while self.response is None: self.connection.process_data_events() return int(self.response) clinet_rpc = RpcClient() print(" [x] Requesting add(30,30)") response = clinet_rpc.call(30,20) print(" [.] Got %r" % response)
rpc_server.py
import pika credentials = pika.PlainCredentials('yang', 'abc123456') connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, credentials=credentials)) channel = connection.channel() channel.queue_declare(queue='rpc_queue', durable=True) # durable=True消息持久化 def add(m, n): return m + n def on_request(ch, method, props, body): m, n = body.decode().split(',') response = add(int(m), int(n)) #將響應信息放入指定的回調隊列中 channel.basic_publish( exchange='', routing_key=props.reply_to,#請求指定的響應隊列 properties=pika.BasicProperties(correlation_id=props.correlation_id,),#請求設定的標識(無需設置持久化,每次響應以後回調隊列銷燬) body=str(response) ) ch.basic_ack(delivery_tag=method.delivery_tag) # 手動回覆確認 channel.basic_qos(prefetch_count=1) # 在沒有確認前只分配一個任務 channel.basic_consume( queue='rpc_queue', on_message_callback=on_request, ) print('Awaiting RPC requests...') channel.start_consuming()