RabbitMQ應用示例

更多詳情參考官方文檔:https://www.rabbitmq.com/tutorials/tutorial-six-python.htmlhtml

參考博客:http://www.javashuo.com/article/p-zeofyjvm-ez.htmlpython

 微服務通訊RPCjson

01-HelloWorld(簡單的消息隊列)服務器

  send.py  app

import pika
#與RabbitMQ服務器創建鏈接
credential = pika.PlainCredentials('yang','abc123456')#erase_on_connect是否清楚憑證
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',credentials=credential))
#建立頻道
channel = connection.channel()#channel_number參數指定頻道編號,建議默認pika自行管理
#建立頻道傳遞消息的隊列
channel.queue_declare(queue='hello')



#向頻道隊列中發送消息
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='hello world!')

print('[x]消息發送成功!')
connection.close()#斷開鏈接
send.py

  receive.pydom

import pika
credentials = pika.PlainCredentials('yang','abc123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',port=5672,credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(f'{body.decode()}')

channel.basic_consume(queue='hello',
                      auto_ack=True,#開啓自動確認關閉手動確認
                      on_message_callback=callback)

print(' [*] Waiting for messages. ')
channel.start_consuming()
recieve.py

 

02-WorkQueues(任務隊列)ide

  new_task.py  微服務

'''
任務生產者將任務發送到指定隊列中,多個工做者進行均分協同處理(啓多個工做者)
'''
import pika
credentials = pika.PlainCredentials('yang','abc123456')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='work_queue',durable=True)#durable=True指定消息持久化,出現異常不會丟失。注意basic_publish須要設置參數

for i in range(1000):
    message = f'new_task{i}...'
    channel.basic_publish(
        exchange='',
        routing_key='work_queue',
        body=message,
        properties = pika.BasicProperties(delivery_mode=2, )# 支持數據持久化:2表明消息是持久
    )
    print(f'Send>>>{message}')

connection.close()
new_task.py

  worker.py測試

import time
import random
import pika
credentials = pika.PlainCredentials('yang','abc123456')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',5672,credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='work_queue',durable=True)#durable=True指定消息持久化,出現異常不會丟失

def callback(ch, method, properties, body):
    print(f'Rceive>>{body}')
    time.sleep(random.random())
    print(f'Done--{body.decode()}')
    # 手動確認機制(在消費者掛掉沒有給確認時,消息不會丟失)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)#此設置確保在沒有進行確認以前當前接受得任務只有1個(不設置默認是平均分配)
channel.basic_consume(
    queue='work_queue',
    on_message_callback=callback
)

channel.start_consuming()
worker.py

 

03-PublishSubcribe(訂閱發佈)fetch

  publish.py 

'''
發佈者發佈一條任務,經過交換機發送到與此交換機簡歷鏈接的全部隊列中進行共享(啓多個訂閱者)
'''
import pika
credentials = pika.PlainCredentials('yang','abc123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials))

channel = connection.channel()

#建立頻道的交換器(交換器負責將任務發送到鏈接詞交換器的全部的隊列中)
channel.exchange_declare(exchange='logs',
                         exchange_type='fanout',#建立一個fanout(廣播)類型的交換機exchange,名字爲logs
                         durable=True)#durabl持久化

for i in range(1000):
    message = f'new_task{i}...'
    channel.basic_publish(
        exchange='logs',#指定交換機
        routing_key='',#無需指定路由鍵隊列,由交換機進行發送
        body=message,
        properties = pika.BasicProperties(delivery_mode=2, )# delivery_mode支持數據持久化:2表明消息是持久
    )
    print(f'Send>>>{message}')

connection.close()
publish.py

  subcribe.py

import pika
credentials = pika.PlainCredentials('yang','abc123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, credentials=credentials))

channel = connection.channel()

#與交換器創建鏈接
channel.exchange_declare(exchange='logs',exchange_type='fanout',durable=True)#durable持久化

#肯定消息隊列(不指定隊列名,每一個訂閱者rabbirmq服務器鏈接都會建立一個隨機隊列)
result= channel.queue_declare(queue='',exclusive=True,durable=True)#exclusive=True當斷開鏈接時,隊列銷燬(持久化沒有用,設置了斷開銷燬)
queue_name= result.method.queue
#與交換機新建的的隨機隊列進行綁定
channel.queue_bind(exchange='logs',queue=queue_name)

def callback(ch, method, properties, body):
    print(f'Rceive>>{body}')
    print(f'Done--{body.decode()}')
    # 手動確認機制(在消費者掛掉沒有給確認時,消息不會丟失)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback
)

channel.start_consuming()
subcribe.py

 

06-RPC(遠程過程調用)

  rpc_client.py

import pika
import uuid


class RpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials('yang', 'abc123456')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, credentials=credentials))

        self.channel = self.connection.channel()

        # 隨機建立一個惟一隊列
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        # 接收回調隊列中的消息
        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, m, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        # 想通訊隊列放入消息
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                delivery_mode=2,#持久化參數
                # content_type='application/json',#指定body數據類型,不指定則爲字符串(有待測試)
                reply_to=self.callback_queue,  # 指定回調隊列
                correlation_id=self.corr_id,  # 指定本次請求標識id
            ),
            body=str(m) + ',' + str(n)
        )


        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


clinet_rpc = RpcClient()

print(" [x] Requesting add(30,30)")
response = clinet_rpc.call(30,20)
print(" [.] Got %r" % response)
rpc_client.py

  rpc_server.py

import pika

credentials = pika.PlainCredentials('yang', 'abc123456')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue', durable=True)  # durable=True消息持久化


def add(m, n):
    return m + n

def on_request(ch, method, props, body):
    m, n = body.decode().split(',')
    response = add(int(m), int(n))

    #將響應信息放入指定的回調隊列中
    channel.basic_publish(
        exchange='',
        routing_key=props.reply_to,#請求指定的響應隊列
        properties=pika.BasicProperties(correlation_id=props.correlation_id,),#請求設定的標識(無需設置持久化,每次響應以後回調隊列銷燬)
        body=str(response)
    )
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手動回覆確認


channel.basic_qos(prefetch_count=1)  # 在沒有確認前只分配一個任務
channel.basic_consume(
    queue='rpc_queue',
    on_message_callback=on_request,
)

print('Awaiting RPC requests...')
channel.start_consuming()
rpc_server.py  
相關文章
相關標籤/搜索