python-RabbtiMQ消息隊列

1.RabbitMQ簡介

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。 AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。 RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

 

2.RabbitMQ能爲你作些什麼?

 

消息系統容許軟件、應用相互鏈接和擴展.這些應用能夠相互連接起來組成一個更大的應用,或者將用戶設備和數據進行鏈接.消息系統經過將消息的發送和接收分離來實現應用程序的異步和解偶. 或許你正在考慮進行數據投遞,非阻塞操做或推送通知。或許你想要實現發佈/訂閱,異步處理,或者工做隊列。全部這些均可以經過消息系統實現。 RabbitMQ是一個消息代理 - 一個消息系統的媒介。它能夠爲你的應用提供一個通用的消息發送和接收平臺,而且保證消息在傳輸過程當中的安全。

 

3.RabbitMQ 安裝使用

 

 

4.Python應用RabbitMQ

python操做RabbitMQ的模塊有三種:pika,Celery,Haigha。 本文使用的是pika。
""" RabbitMQ-生產者。 """

import pika """聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """定義一個queue,定義queue名稱,標識""" channel.queue_declare(queue='hello') """定義queue中的消息內容""" channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'")
""" RabbitMQ-消費者。 """

import pika """聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """定義一個queue,定義queue名稱,標識,與生產者隊列中對應""" channel.queue_declare(queue='hello') def callback(ch,method,properties,body): print('rev-->',ch,method,properties,body) print('rev messages-->',body) """消費,接收消息...""" channel.basic_consume( consumer_callback=callback, # 若是收到消息,則回調這個函數處理消息
    queue='hello', # queue_declare(queue='hello') 對應
    no_ack=True ) """ 消費者會一直監聽這queue,若是隊列中沒有消息,則會卡在這裏,等待消息隊列中生成消息。 """

print('waiting for meassages, to exit press CTRL+C') channel.start_consuming()

 

5.RabbitMQ消息持久化

import pika queue_name = 'xiaoxi_'

"""聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """定義一個queue,定義queue名稱,標識 queue,durable 持久化 """ channel.queue_declare(queue=queue_name) while True: input_value = input(">>:").strip() if input_value: """定義queue中的消息內容"""
        print('producer messages:{0}'.format(input_value)) channel.basic_publish( exchange='', routing_key=queue_name, body=input_value, properties=pika.BasicProperties(   # 消息持久化.....
                delivery_mode=2, ) ) continue
producer.py
import pika,time queue_name = 'xiaoxi_'

"""聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """定義一個queue,定義queue名稱,標識""" channel.queue_declare(queue=queue_name) def callback(ch,method,properties,body): print('rev-->',ch,method,properties,body) #time.sleep(5) # 模擬消費者丟失生產者發送的消息,生產者消息隊列中的這一條消息則不會刪除。
    print('rev messages-->',body) """手動向生產者確認收到消息"""
    #ch.basic_ack(delivery_tag=method.delivery_tag)

"""消費,接收消息...""" channel.basic_consume( consumer_callback=callback, # 若是收到消息,則回調這個函數處理消息
    queue=queue_name, #no_ack=True #接收到消息,主動向生產者確認已經接收到消息。
) print('waiting for meassages, to exit press CTRL+C') channel.start_consuming()
consumer.py

 

6.RabbitMQ消息公平分發

import pika queue_name = 'xiaoxi_1'

"""聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """定義一個queue,定義queue名稱,標識 queue,durable 持久化 """ channel.queue_declare(queue=queue_name) while True: input_value = input(">>:").strip() if input_value: """定義queue中的消息內容"""
        print('producer messages:{0}'.format(input_value)) channel.basic_publish( exchange='', routing_key=queue_name, body=input_value, ) continue
producer.py
import pika,time queue_name = 'xiaoxi_1'

"""聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """定義一個queue,定義queue名稱,標識 queue,durable 持久化 """ channel.queue_declare(queue=queue_name) def callback(ch,method,properties,body): print('rev-->',ch,method,properties,body) print('rev messages-->',body) """模擬處理消息快慢速度""" time.sleep(1) ch.basic_ack(delivery_tag=method.delivery_tag) """根據消費者處理消息的快慢公平分發消息""" channel.basic_qos(prefetch_count=1) """消費,接收消息...""" channel.basic_consume( consumer_callback=callback,  # 若是收到消息,則回調這個函數處理消息
    queue=queue_name, # no_ack=True #接收到消息,主動向生產者確認已經接收到消息。
) print('waiting for meassages, to exit press CTRL+C') channel.start_consuming()
consumer.py

 

7.RabbitMQ-廣播模式。

 消息的發送模式類型 1.fanout: 全部bind到此exchange的queue均可以接收消息 便是廣播模式,全部的consumer都能收到。 2.direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息 ,指定惟一的。 3.topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息。符合條件的。 表達式符號說明:#表明一個或多個字符,*表明任何字符
            例:#.a會匹配a.a,aa.a,aaa.a等
                  *.a會匹配a.a,b.a,c.a等 注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 
        4.headers: 經過headers 來決定把消息發給哪些queue (少用)

 

7.1 topic 廣播模式。

import pika """聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """經過routingKey和exchange決定的那個惟一的queue能夠接收消息 ,指定惟一的。""" exchange_name = 'topic_messages1' routing_key = 'my_topic'


"""定義exchage模式 direct廣播模式""" channel.exchange_declare(exchange=exchange_name,exchange_type='topic') """ 消息的發送模式類型 1.fanout: 全部bind到此exchange的queue均可以接收消息 便是廣播模式,全部的consumer都能收到。 2.direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息 ,指定惟一的。 3.topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息。符合條件的。 表達式符號說明:#表明一個或多個字符,*表明任何字符 例:#.a會匹配a.a,aa.a,aaa.a等 *.a會匹配a.a,b.a,c.a等 注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout  4.headers: 經過headers 來決定把消息發給哪些queue (少用) """


while True: input_value = input(">>:").strip() if input_value: """定義queue中的消息內容"""
        print('producer messages:{0}'.format(input_value)) channel.basic_publish( exchange=exchange_name, routing_key=routing_key, body=input_value, ) continue
producer.py
import pika,time """聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """經過routingKey和exchange決定的那個惟一的queue能夠接收消息 ,指定惟一的。""" exchange_name = 'topic_messages1' routing_key = 'my_topic' channel.exchange_declare(exchange=exchange_name,exchange_type='topic') """不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除""" res = channel.queue_declare(exclusive=True) queue_name = res.method.queue channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key) print('direct_key:{0}'.format(routing_key)) print('queue_name:{0}'.format(queue_name)) def callback(ch,method,properties,body): print('rev-->',ch,method,properties,body) print('rev messages-->',body) ch.basic_ack(delivery_tag=method.delivery_tag) """消費,接收消息...""" channel.basic_consume( consumer_callback=callback,  # 若是收到消息,則回調這個函數處理消息
    queue=queue_name, ) print('waiting for meassages, to exit press CTRL+C') channel.start_consuming()
consumer.py

 

7.2 direct 廣播模式

import pika connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() """經過routingKey和exchange決定的那個惟一的queue能夠接收消息 ,指定惟一的。""" exchange_name = 'direct_messages' routing_key = 'my_direct'

""" 定義exchage模式 direct廣播模式 消息的發送模式類型 1.fanout: 全部bind到此exchange的queue均可以接收消息 便是廣播模式,全部的consumer都能收到。 2.direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息 ,指定惟一的。 3.topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息。符合條件的。 表達式符號說明:#表明一個或多個字符,*表明任何字符 例:#.a會匹配a.a,aa.a,aaa.a等 *.a會匹配a.a,b.a,c.a等 注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout  4.headers: 經過headers 來決定把消息發給哪些queue (少用) """ channel.exchange_declare(exchange=exchange_name,exchange_type='direct') channel.basic_publish( exchange=exchange_name, routing_key=routing_key, body='hello word!', ) # while True: # input_value = input(">>:").strip() # if input_value: # """定義queue中的消息內容""" # print('producer messages:{0}'.format(input_value)) # channel.basic_publish( # exchange=exchange_name, # routing_key=routing_key, # body=input_value, # ) # continue
producer.py
import pika,time connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) channel = connection.channel() exchange_name = 'direct_messages' routing_key = 'my_direct' channel.exchange_declare(exchange=exchange_name,exchange_type='direct') """不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除""" res = channel.queue_declare(exclusive=True) queue_name = res.method.queue channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key) print('direct_key:{0}'.format(routing_key)) print('queue_name:{0}'.format(queue_name)) def callback(ch,method,properties,body): print('rev-->',ch,method,properties,body) print('rev messages-->',body) ch.basic_ack(delivery_tag=method.delivery_tag) """消費,接收消息...""" channel.basic_consume( consumer_callback=callback,  # 若是收到消息,則回調這個函數處理消息
    queue=queue_name, ) print('waiting for meassages, to exit press CTRL+C') channel.start_consuming()
consumer.py

 

7.3 fanout 廣播模式

import pika """聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() exchange_name = 'messages'
"""定義exchage模式 fanout廣播模式""" channel.exchange_declare(exchange=exchange_name,exchange_type='fanout') """ 消息的發送模式類型 1.fanout: 全部bind到此exchange的queue均可以接收消息 便是廣播模式,全部的consumer都能收到。 2.direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息 ,指定惟一的。 3.topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息。符合條件的。 表達式符號說明:#表明一個或多個字符,*表明任何字符 例:#.a會匹配a.a,aa.a,aaa.a等 *.a會匹配a.a,b.a,c.a等 注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout  4.headers: 經過headers 來決定把消息發給哪些queue (少用) """


while True: input_value = input(">>:").strip() if input_value: """定義queue中的消息內容"""
        print('producer messages:{0}'.format(input_value)) channel.basic_publish( exchange=exchange_name, routing_key='', body=input_value, ) continue
producer.py
import pika,time """聲明socket""" connection = pika.BlockingConnection( pika.ConnectionParameters('localhost') ) """聲明一個管道""" channel = connection.channel() """
    
""" exchange_name = 'messages' channel.exchange_declare(exchange=exchange_name,exchange_type='fanout') """不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除""" res = channel.queue_declare(exclusive=True) queue_name = res.method.queue channel.queue_bind(exchange=exchange_name,queue=queue_name) """每個消費者隨機一個惟一的queue_name"""
print('queue_name:{0}',format(queue_name)) def callback(ch,method,properties,body): print('rev-->',ch,method,properties,body) print('rev messages-->',body) ch.basic_ack(delivery_tag=method.delivery_tag) """消費,接收消息...""" channel.basic_consume( consumer_callback=callback,  # 若是收到消息,則回調這個函數處理消息
    queue=queue_name, # no_ack=True #接收到消息,主動向生產者確認已經接收到消息。
) print('waiting for meassages, to exit press CTRL+C') channel.start_consuming()
consumer.py

 

8 RabbitMQ 實現 RPC

""" RabbitMQ-生產者。 利用rabbitMQ 實現一個能收能發的RPC小程序。 重點須要注意的是:queue的綁定。接收的一端必選預先綁定queue生成隊列,發送端才能根據queue發送。 """

import pika,uuid,time class rabbitmqClient(object): def __init__(self,rpc_queue): self.rpc_queue = rpc_queue self.app_id = str(uuid.uuid4()) self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() """生成一個自動queue,傳過去server,server再往這個自動queue回覆數據""" autoqueue = self.channel.queue_declare(exclusive=True) self.callback_queue = autoqueue.method.queue """先定義一個接收回復的動做""" self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self,ch,method,properties,body): if self.app_id == properties.app_id: self.response = body def send(self,msg): self.response = None self.channel.basic_publish( exchange='', routing_key=self.rpc_queue, properties=pika.BasicProperties( reply_to=self.callback_queue, app_id=self.app_id, ), body=str(msg) ) # 發送完消息,進入接收模式。
        while self.response is None: # print('callback_queue:{0} app_id:{1} wait...'.format(self.callback_queue,self.app_id))
 self.connection.process_data_events() # time.sleep(0.5)
        return self.response rpc_request_queue = 'rpc_request_queue' rb = rabbitmqClient(rpc_request_queue) while True: msg = input('input >> :').strip() if msg: print('rpc_queue:{0} app_id:{1}'.format(rb.rpc_queue,rb.app_id)) print('send msg:{}'.format(msg)) reponses = rb.send(msg) print('reponses msg:{}'.format(reponses.decode('utf-8'))) continue
client.py
""" RabbitMQ-消費者。 """

import pika class rabbitmqServer(object): def __init__(self,rpc_queue): self.rpc_queue = rpc_queue self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) self.channel = self.connection.channel() self.channel.queue_declare(queue=self.rpc_queue) def on_reponses(self,ch,method,properties,body): if body: # reponser ...
            ch.basic_publish(exchange='', routing_key=properties.reply_to, properties=pika.BasicProperties( reply_to=properties.reply_to, app_id=properties.app_id, ), body='reponses ok! msg is:{}'.format(body.decode('utf-8'))) def start_consuming(self): self.channel.basic_consume(consumer_callback=self.on_reponses,queue=self.rpc_queue,no_ack=True) print('waiting for meassages, to exit press CTRL+C') self.channel.start_consuming() rpc_request_queue = 'rpc_request_queue' rd_server = rabbitmqServer(rpc_request_queue) rd_server.start_consuming()
server.py
相關文章
相關標籤/搜索