RabbitMQ 全套

本博客代碼運行環境

     ErLang:      ErLang_X64_22 version
     RabbitMQ:    RabbitMQ_Server_3.7.15 version
     python :     Python 3.7.1rc1   version
     pip :       pip 19.1.1  version
     pika :       pika 1.0.1 version

什麼是MQ

消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。
其主要用途:不一樣進程Process/線程Thread之間通訊。
爲何會產生消息隊列?有幾個緣由:

不一樣進程(process)之間傳遞消息時,兩個進程之間耦合程度太高,改動一個進程,引起必須修改另外一個進程,爲了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),全部兩進程之間傳遞的消息,都必須經過消息隊列來傳遞,單獨修改某一個進程,不會影響另外一個;

不一樣進程(process)之間傳遞消息時,爲了實現標準化,將消息的格式規範化了,而且,某一個進程接受的消息太多,一會兒沒法處理完,而且也有前後順序,必須對收到的消息進行排隊,所以誕生了事實上的消息隊列;

關於消息隊列的詳細介紹請參閱:
《Java帝國之消息隊列》
《一個故事告訴你什麼是消息隊列》
《到底何時該使用MQ》

MQ框架很是之多,比較流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里開源的RocketMQ。本文主要介紹RabbitMq。

 本教程pdf及代碼下載地址
代碼:https://download.csdn.net/download/zpcandzhj/10585077
教程:https://download.csdn.net/download/zpcandzhj/10585092html

 

RabbitMQ

RabbitMQ簡介

    
    一、MQ 爲 Message Queue, 消息隊列是應用程序和應用程序之間的通訊方法,
    二、RabbitMQ 是一個開源的, 在AMQP 基礎上完整的,可複用的企業消息系統,消息中間件 , 消息隊列
    三、支持主流的 OS, Linux, Windows, MacOX 等,
    四、多種開發語言支持, java、Python、Ruby、.NET、PHP、C/C++、node.js 等
    五、是專業級別的, 甩 python 的隊列好幾條街
    六、開發語言: Erlang----面向併發的編程語言----- 愛立信公司, 能夠作到 熱插拔, 局部加載, 不須要重啓整個服務
 
   AMQP: 消息隊列的一個協議。

搭建RabbitMQ環境:windows下安裝

第一步 提供 Erlang 編程語言環境-----》安裝 Erlang

官網: http://www.erlang.org/download ,java

安裝RabbitMQ

官網: https://www.rabbitmq.com/install-windows.htmlnode

安裝完成: 打開 cmd 命令行工具, cd 到 RabbitMQ 的安裝目錄下的 sbin/ 子目錄 中。 如圖:python

一、啓動管理工具插件: rabbitmq-plugins enable rabbitmq_managementgit

二、啓動 RabbitMQ 服務:net start rabbitmqgithub

三、瀏覽器輸入地址: http://127.0.0.1:15672/編程

    四、使用 默認帳號管理: guest/ guest  , 可以登錄 ,說明安裝成功json

    4.一、 添加 admin 用戶:windows

     4.二、用戶角色: 瀏覽器

   一、超級管理員(administrator)
      可登錄管理控制檯,可查看全部的信息,而且能夠對用戶,策略(policy)進行操做。
   二、監控者(monitoring)
      可登錄管理控制檯,同時能夠查看rabbitmq節點的相關信息(進程數,內存使用狀況,磁盤使用狀況等)
   三、策略制定者(policymaker)
     可登錄管理控制檯, 同時能夠對policy進行管理。但沒法查看節點的相關信息(上圖紅框標識的部分)。
   四、普通管理者(management)
      僅可登錄管理控制檯,沒法看到節點信息,也沒法對策略進行管理。
   五、其餘
      沒法登錄管理控制檯,一般就是普通的生產者和消費者。

 

     4.三、建立Virtual Hosts

virtual hosts

        選中Admin用戶,設置權限:

permissions

     看到權限已加

look

4.四、 管理界面中的功能: 

vir

shuo

   4.五、 管理工具中 查看隊列消息:

    點擊上面的隊列名稱,查詢具體的隊列中的信息:

 

使用 rabbitMQ 命令 添加用戶並設置權限的步驟:

   1. 建立用戶:      rabbitmqctl add_user name password
   2. 設置用戶角色:   rabbitmqctl set_user_tags name administrator
   3. 遠程鏈接需設置用戶權限, 表明容許從外面訪問: 
      rabbitmqctl set_permissions -p / name ".*" ".*" ".*"
    解析:          set_permissions [-p vhost] {user} {conf} {write} {read}

   RabbitMQ 經常使用命令: 

 1 1、rabbitmq 管理器插件的啓動和關閉:
 2     **啓動監控管理器:rabbitmq-plugins enable rabbitmq_management
 3     關閉監控管理器:rabbitmq-plugins disable rabbitmq_management
 4 2、服務的啓動與關閉:
 5      **啓動rabbitmq:rabbitmq-service start
 6      關閉rabbitmq:rabbitmq-service stop
 7      **使用 windows 命令: net start rabbitmq
 8                                  net stop rabbitmq
 9 3、rabbitmq服務器的啓動和關閉:
10     前臺啓動: rabbitmq-server start
11     後臺啓動: rabbitmq-server -detached
12     前臺中止:rabbitmqctl stop
13     查看 RabbitMQ 的狀態: rabbitmqctl status
14 4、rabbitmq 應用管理: 
15     關閉應用:rabbitmqctl stop_app
16     啓動應用:rabbitmqctl start_app
17 5、用戶管理: 
18     **添加用戶: rabbitmqctl add_user username password
19     列出全部用戶: rabbitmqctl list_users
20     刪除用戶: rabbitmqctl delete_user username 
21     **修改用戶密碼: rabbitmqctl change_password username 
22  newpassword
23 6、角色管理: 
24     **分配角色:rabbitmqctl set_user_tags username administrator
25 
26      角色說明
27              none 最小權限角色
28              management 管理員角色
29              policymaker 決策者
30              monitoring 監控
31              administrator 超級管理員
32 7. 權限管理: 
33    清除用戶權限: rabbitmqctl clear_permissions -p vhostpath user
34    **設置用戶權限: rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
35 8、虛擬主機管理:
36     列出全部虛擬主機: rabbitmqctl list_vhosts
37     建立虛擬主機:      rabbitmqctl list_vhost vhostpath
38     刪除虛擬主機:     rabbitmqctl delete_vhost vhostpath
39     列出虛擬主機全部權限:rabbitmqctl list_permissions -p vhostpath
40 9、隊列管理:
41    **查看全部的隊列:rabbitmqctl list_queues
42    清除全部的隊列:rabbitmqctl reset
43    查看全部綁定:    rabbitmqctl list_bindings
44    查看全部通道:    rabbitmqctl list_channels
45    查看全部鏈接:    rabbitmqctl list_connections
46    列出全部消費者: rabbitmqctl list_consumers
47    **列出全部交換機: rabbitmqctl list_exchanges

 

Python 操做 RabbitMQ 之深刻淺出

        此博客代碼託管地址:  https://github.com/SuoSuo-Rocky/RabbitMQ-FullStack

安裝 rabbitMQ module

 
 pip install pika 
 or 
 easy_install pika
 or 
 源碼 : https://pypi.python.org/pypi/pika 

實現最簡單的隊列通訊

 

send端

 1 #  發送端, 消費者
 2 import pika
 3 
 4 credentials = pika.PlainCredentials('shiwei', 'shiwei666666')
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
 6 
 7 # 在鏈接之上建立一個 rabbit 協議的通道
 8 channel = connection.channel()
 9 
10 # 在通道中 聲明 一個 queue
11 channel.queue_declare(queue='hello')
12 
13 # 一個消息永遠不能直接發送到隊列,它老是須要通過一個交換
14 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
15 channel.basic_publish(exchange='',  # 交換機
16                       routing_key='hello',   # 路由鍵,寫明將消息發往哪一個隊列,本例是將消息發往隊列hello
17                       body='Hello World!')   # 生產者要發送的消息 內容
18 print(" [x] Sent 'Hello World!'")
19 connection.close()  # 當生產者發送完消息後,可選擇關閉鏈接

receive端

 

import pika
import time
credentials = pika.PlainCredentials('shiwei', 'shiwei666666')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()

# You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
# 聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行
channel.queue_declare(queue='hello')

# 定義一個回調函數,用來接收生產者發送的消息
def callback(ch, method, properties, body):
    print("received msg...start processing....",body)
    time.sleep(5)
    print(" [x] msg process done....",body)

channel.basic_consume(on_message_callback=callback, # 定義一個回調函數,用來接收生產者發送的消息
                      auto_ack=True,
                      queue='hello',            # 指定取消息的隊列名
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()      #開始循環取消息

實現功能

   (1)rabbitmq循環調度公平分發,將消息循環發送給不一樣的消費者,如:消息1,3,5發送給消費者1;消息2,4,6發送給消費者2。
   (2)消息確認機制,爲了確保一個消息不會丟失,RabbitMQ支持消息的確認 , 一個 ack(acknowlegement) 是從消費者端發送一個確認去告訴RabbitMQ 消息已經接收了、處理了,RabbitMQ能夠釋放並刪除掉了。若是一個消費者死掉了(channel關閉、connection關閉、或者TCP鏈接斷開了)而沒有發送ack,RabbitMQ 就會認爲這個消息沒有被消費者處理,並會從新發送到生產者的隊列裏,若是同時有另一個消費者在線,rabbitmq將會將消息很快轉發到另一個消費者中。 那樣的話你就能確保雖然一個消費者死掉,但消息不會丟失。
     這個是沒有超時的,當消費方(consumer)死掉後RabbitMQ會從新轉發消息,即便處理這個消息須要很長很長時間也沒有問題。消息的 acknowlegments 默認是打開的,在前面的例子中關閉了: auto_ack = True . 如今刪除這個標識 而後 發送一個 acknowledgment。

消息持久化

 消息持久化,將消息寫入硬盤中。
       注意: (1)、RabbitMQ不容許你從新定義一個已經存在、但屬性不一樣的queue, 不然報錯
            (2)、標記消息爲持久化並不能徹底保證消息不會丟失,儘管已經告訴RabbitMQ將消息保存到磁盤,但 
       RabbitMQ接收到的消息在尚未保存的時候,仍然有一個短暫的時間窗口。RabbitMQ不會對每一個消息都執行同步 - 
      -- 可能只是保存到緩存cache尚未寫入到磁盤中。所以這個持久化保證並非很強,但這比咱們簡單的任務queue要 
      好不少,若是想要很強的持久化保證,可使用 publisher confirms。
     公平調度: 在一個消費者未處理完一個消息以前不要分發新的消息給它,而是將這個新消息分發給另外一個不是很忙的消費 
               者進行處理。爲了解決這個問題咱們能夠在消費者代碼中使用 channel.basic.qos (prefetch_count = 1 ),將消費者設置爲公平調度

生產者

  

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
chann = conn.channel()

# 源碼:
"""
#     def queue_declare(self,            # channel.queueDeclare 用來建立隊列,有5個參數:
#                       queue,           # String queue, 隊列名;
#                       passive=False,   # 
#                       durable=False,   # boolean durable, 該隊列是否須要持久化
#                       exclusive=False, # boolean exclusive,該隊列是否爲該通道獨佔的(其餘通道是否能夠消費該隊列)
#                       auto_delete=False, # boolean autoDelete,該隊列再也不使用的時候,是否讓RabbitMQ服務器自動刪除掉;
#                       arguments=None)

"""
chann.queue_declare(queue='test_tags', # 聲明 隊列, 不可與 已存在的 隊列重名 , 不然 報錯
                    durable=True,      # 設置隊列 持久化 , 報 : ChannelClosedByBroker: 406 , 錯誤, passive:是屈服的意思,將passive設爲True,問題解決。
                    # passive= True,
                    )
message = "My name is shiwei"

chann.basic_publish(exchange='',
                    routing_key='test_tags',                              # 代表 要將 消息 發送到哪一個隊列
                    body = message,
                    properties = pika.BasicProperties(delivery_mode = 2)  # 設置消息持久化, 將消息的屬性設置爲 2 ,表示消息持久化
                    )

print('[Publisher] Send %s' % message)
conn.close()

消費者

import pika
import time

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

chann = conn.channel()

chann.queue_declare(queue='test_tags',  # 聲明 隊列, 不可與 已存在的 隊列重名 , 不然 報錯
                    durable=True,       # 設置隊列 持久化 ,
                    # passive=True,     # 是否檢查當前隊列 是否存在 , True 表示 當前聲明隊列 爲 存在 的,
                    )
# 定義 接受消息 的 回調函數
def callback(ch,method, properties, body):
    print(" [消費者] Received %r" % body)
    time.sleep(3)
    print(" [消費者] Done")
    # 手動 確認  在接收到 消息後 給 rabbitmq 發送一個 確認 ACK, 返回 消息標識符
    ch.basic_ack(delivery_tag=method.delivery_tag)
"""
    def basic_consume(self,
                      queue,
                      on_message_callback,
                      auto_ack=False,
                      exclusive=False,
                      consumer_tag=None,
                      arguments=None):
"""
chann.basic.qos (prefetch_count = 1 )
# 注意 源碼中的 位置參數的位置
chann.basic_consume(queue='test_tags',
                    on_message_callback = callback,
                    # 是否 須要 自動 確認, 若爲 False, 則須要在 消息回調函數中手動確認,
                    auto_ack = False,  # 默認是  False
                    )

chann.start_consuming()  # 開始 循環 接受消息

發佈-訂閱_廣播:一對多-------交換機

 
exchange:交換機。生產者不是將消息發送給隊列,而是將消息發送給交換機,由交換機決定將消息發送給哪一個隊列。因此 
     exchange必須準確知道消息是要送到哪一個隊列,仍是要被丟棄。所以要在exchange中給exchange定義規則,全部的規 
      則都是在exchange的類型中定義的。
exchange有4個類型:
     fanout : 全部bind到此exchange的queue均可以接收消息
     direct :經過routingKey和exchange決定的那個惟一的queue能夠接收消息
     topic  :全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息
          表達式符號說明:#表明一個或多個字符,*表明任何字符
              例:#.a會匹配a.a,aa.a,aaa.a等
                 *.a會匹配a.a,b.a,c.a等
                 注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 
    headers :經過headers 來決定把消息發給哪些queue
   以前,咱們並無講過exchange,可是咱們仍然能夠將消息發送到隊列中。這是由於咱們用的是默認exchange.也就是 
        說以前寫的:exchange='',空字符串表示默認的exchange。

exchange_type=fanout

廣播類型,生產者將消息發送給全部消費者,若是某個消費者沒有收到當前消息,就再也收不到了(消費者就像收音機)
           生產者:(能夠用做日誌收集系統)
           開啓多個消費者後,會同時從生產者接收相同的消息

fanout

 

消息publisher

import pika

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

#在鏈接上建立一個頻道
channel = conn.channel()

#建立一個fanout(廣播)類型的交換機exchange,名字爲logs。
channel.exchange_declare(exchange="logs", exchange_type="fanout")
message =  "info: Hello World!"
channel.basic_publish(exchange='logs',# 指定交換機exchange爲logs,這裏只須要指定將消息發給交換機logs就能夠了,不須要指定隊列,由於生產者消息是發送給交換機的。
                      routing_key='', # 在fanout類型中,綁定關鍵字routing_key必須忽略,寫空便可
                      body=message)
print(" [x] Sent %r" % message)
conn.close()

消息subscriber

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

#在鏈接上建立一個頻道
channel = conn.channel()

channel.exchange_declare(exchange="logs", exchange_type="fanout")  # 參數名改變了,之前版本是 type

result = channel.queue_declare(exclusive=True,  # 建立隨機隊列,exclusive=True(惟一性)當消費者與rabbitmq斷開鏈接時,這個隊列將自動刪除。
                               queue='',)

queue_name = result.method.queue # 分配隨機隊列的名字。
channel.queue_bind(exchange='logs',# 將交換機、隊列綁定在一塊兒,
                   queue=queue_name,)

def callback(ch, method, properties, body):   #  定義回調函數,接收消息
    print(" [消費者] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name,
                      on_message_callback = callback,
                      auto_ack=True) # 消費者接收消息後,不給rabbimq回執確認。

channel.start_consuming() # 循環等待消息接收。

exchange_type=direct

   RabbitMQ還支持根據關鍵字發送,無需聲明隊列,即:發佈時給隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 
        關鍵字 斷定應該將數據發送至指定隊列。
        例如: 根據 日誌級別,info, warning, error, success,本例便是。 
   注意: *****本例 需從命令行啓動,給定參數-------》隊列的綁定關鍵字
 

direct

消息publisher

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

#在鏈接上建立一個頻道
channel = conn.channel()

#建立一個交換機並聲明 direct 的類型爲:關鍵字類型,表示該交換機會根據消息中不一樣的關鍵字將消息發送給不一樣的隊列
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")

severity = sys.argv[1] if len(sys.argv) > 1 else "info"

message = ' '.join(sys.argv[2:]) or "Hello World!"

channel.basic_publish(exchange='direct_logs', # 指明用於發佈消息的交換機、關鍵字
                      routing_key=severity,   # 綁定關鍵字,即將message與關鍵字info綁定,明確將消息發送到哪一個關鍵字的隊列中。
                      body=message)
print(" [x] Sent %r" % message)
conn.close()

消息subscriber

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

# 在鏈接上建立一個頻道
channel = conn.channel()

channel.exchange_declare(exchange="direct_logs", exchange_type="direct")  # 參數 名改變了, 之前是 type

result = channel.queue_declare(exclusive=True,  # 建立隨機隊列,當消費者與rabbitmq斷開鏈接時,這個隊列將自動刪除。
                               queue='',)
queue_name = result.method.queue              # 分配隨機隊列的名字。

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:                   # 循環 隊列, 使其與交換機綁定在一塊兒,
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity,)

def callback(ch, method, properties, body):   # 定義回調函數,接收消息
    print(" [消費者] %r:%r" % (method.routing_key, body))

channel.basic_consume(queue=queue_name,
                      on_message_callback = callback,
                      auto_ack=True,)         # 消費者接收消息後,不給rabbimq回執確認。

channel.start_consuming()                     # 循環等待消息接收。

  

exchange_type=topic---> 模糊匹配類型。比較經常使用

 
  發送到一個 topics交換機的消息,它的 routing_key不能是任意的 -- 它的routing_key必須是一個用小數點分割的 
      單詞列表。 這個字符能夠是任何單詞,可是一般是一些指定意義的字符。好比: 
      「stock.usd.nyse","nyse.vmw","quick.orange.rabbit".  這裏能夠是你想要路由鍵的任意字符。最高限制 
       爲255字節。
  生產者與消費者的routing_key必須在同一個表單中。 Topic交換的背後的邏輯相似直接交換(direct) -- 包含特定關 
      鍵字的消息將會分發到全部匹配的關鍵字隊列中。而後有兩個重要的特殊狀況:
      綁定鍵值:
        > * (星)  可代替一個單詞
        > # (井) 可代替0個或多個單詞 
       注意: *****本例 需從命令行啓動,給定參數-------》隊列的綁定關鍵字

topic

消息publisher

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

channel = conn.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
conn.close()

消息subscriber

import pika
import sys

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

channel = conn.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

result = channel.queue_declare(exclusive=True,
                               queue="",)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(on_message_callback = callback,
                      queue=queue_name,
                      auto_ack=True)

channel.start_consuming()
To receive all the logs run:
   python receive_logs_topic.py "#"
To receive all logs from the facility "kern":
   python receive_logs_topic.py "kern.*"
Or if you want to hear only about "critical" logs:
   python receive_logs_topic.py "*.critical"
You can create multiple bindings:
   python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key "kern.critical" type:
   python emit_log_topic.py "kern.critical" "A critical kernel error"

exchange_type=topic----例 二 :

生產者

 1 import pika
 2 import sys
 3 
 4 username = "shiwei"
 5 pwd = 'shiwei666666'
 6 user_pwd = pika.PlainCredentials(username, pwd)
 7 
 8 # 建立鏈接
 9 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
10 
11 channel = conn.channel()
12 channel.exchange_declare(exchange='topic_logs',
13                          exchange_type='topic')  # 建立模糊匹配類型的exchange。。
14 
15 routing_key = '[warn].kern' # 這裏關鍵字必須爲點號隔開的單詞,以便於消費者進行匹配。引伸:這裏能夠作一個判斷,判斷產生的日誌是什麼級別,而後產生對應的routing_key,使程序能夠發送多種級別的日誌
16 message =  'Hello World!'
17 channel.basic_publish(exchange='topic_logs',#將交換機、關鍵字、消息進行綁定
18                       routing_key=routing_key,  # 綁定關鍵字,將隊列變成[warn]日誌的專屬隊列
19                       body=message)
20 print(" [x] Sent %r:%r" % (routing_key, message))
21 conn.close()

 

消費者

 1 import pika
 2 import sys
 3 
 4 username = "shiwei"
 5 pwd = 'shiwei666666'
 6 user_pwd = pika.PlainCredentials(username, pwd)
 7 
 8 # 建立鏈接
 9 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
10 
11 channel = conn.channel()
12 
13 channel.exchange_declare(exchange='topic_logs',
14                          exchange_type='topic')  # 聲明exchange的類型爲模糊匹配。
15 
16 result = channel.queue_declare(exclusive=True,
17                                queue="",)        # 建立隨機一個隊列當消費者退出的時候,該隊列被刪除。
18 queue_name = result.method.queue                 # 建立一個隨機隊列名字。
19 
20 # 綁定鍵。‘#’匹配全部字符,‘*’匹配一個單詞。這裏列表中能夠爲一個或多個條件,能經過列表中字符匹配到的消息,消費者均可以取到
21 binding_keys = ['[warn].*', 'info.*']
22 if not binding_keys:
23     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
24     sys.exit(1)
25 
26 # 經過循環綁定多個「交換機-隊列-關鍵字」,只要消費者在rabbitmq中能匹配到與關鍵字相應的隊列,就從那個隊列裏取消息
27 for binding_key in binding_keys:
28     channel.queue_bind(exchange='topic_logs',
29                        queue=queue_name,
30                        routing_key=binding_key)
31 
32 print(' [*] Waiting for logs. To exit press CTRL+C')
33 
34 
35 def callback(ch, method, properties, body):
36     print(" [x] %r:%r" % (method.routing_key, body))
37 
38 
39 channel.basic_consume(on_message_callback=callback,
40                       queue=queue_name,
41                       auto_ack=True)       # 不給rabbitmq發送確認
42 
43 channel.start_consuming()                  # 循環接收消息

遠程過程調用(RPC)Remote procedure call

消息屬性
AMQP協議在一個消息中預先定義了一個包含14個屬性的集合。大部分屬性不多用到,如下幾種除外:
  > delivery_mode: 標記一個消息爲持久的(值爲2)或者 瞬時的(其它值), 你須要記住這個屬性(在第二課時用到過)
  > content_type : 用來描述 MIME 類型的編碼 ,好比咱們常用的 JSON 編碼,設置這個屬性就很是好實現: application/json
  > reply_to:reply_to沒有特別的意義,只是一個普通的變量名,只是它一般用來命名一個 callback 隊列
  > correlation_id : 用來關聯RPC的請求與應答。關聯id的做用:當在一個隊列中接收了一個返回,咱們並不清楚這個結果時屬於哪一個請求的,這樣當correlation_id屬性使用後,咱們爲每一個請求設置一個惟一值,這個值就是關聯id。這樣,請求會有一個關聯id,該請求的返回結果也有一個相同的關聯id。而後當咱們從callback隊列中接收到一個消息後,咱們查看一下這個關聯,基於這個咱們就能將請求和返回進行匹配。若是咱們看到一個未知的correlation_id值,咱們能夠直接丟棄這個消息 -- 它是不屬於咱們的請求。
RPC執行過程:
    >  當客戶端啓動後,它建立一個匿名的惟一的回調隊列
    > 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來爲每一個請求設置一個惟一標識)
    > 請求發送到 rpc_queue隊列
    > RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to字段中的隊列
    > 客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,若是和請求中的值匹配將返回給應用

rpc

RPC Running Detail:
    >  當客戶端啓動後,它建立一個匿名的惟一的回調隊列
    > 對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來爲每一個請求設置一個惟一標識)
    > 請求發送到 rpc_queue隊列
    > RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to字段中的隊列
    > 客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,若是和請求中的值匹配將返回給應用

RPC Client

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        username = "shiwei"
        pwd = 'shiwei666666'
        user_pwd = pika.PlainCredentials(username, pwd)

        # 建立鏈接
        self.conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

        self.channel = self.conn.channel()

        result = self.channel.queue_declare(exclusive=True, queue= '')  # 隨機生成 一個 queue , 用與 Server 發送消息
        self.callback_queue = result.method.queue

        self.channel.basic_consume(on_message_callback = self.on_response, auto_ack = True,  # 準備 發送 消息
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                                reply_to=self.callback_queue,
                                                correlation_id=self.corr_id,),
                                   body=str(n))
        while self.response is None:
            self.conn.process_data_events()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(7)
print(" [.] Got %r" % response)
Client Running Detail:
   > (11) 咱們創建一個鏈接,通道並定義一個專門的’callback‘隊列用來接收回復
   > (19) 咱們訂閱了「callback」隊列,所以咱們可以接收 RPC 的返回結果
   > (21) ’on_response'  在每一個返回中執行的回調是一個簡單的job, 對每一個返回消息將檢查correlation_id是不是咱們須要查找的那個ID,若是是,將保存結果到 self.response 並終端consuming循環
   > (25) 下一步,咱們定義咱們的main方法 - 執行實際的RPC請求
   > (27) 在這方法中,首先咱們生產一個惟一的 correlatin_id 號並保存 -- 'on_response"回調函數將用着號碼來匹配發送和接收的消息值
   > (28) 下一步,發佈請求信息,使用兩個屬性: reply_to 和 correlation_id
   > (34) 這一步咱們能夠坐等結果的返回
   > (36) 最後咱們返回結果給用戶

RPC Server

import pika

username = "shiwei"
pwd = 'shiwei666666'
user_pwd = pika.PlainCredentials(username, pwd)

# 建立鏈接
conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))

channel = conn.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id= props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback = on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
Server Running Detail:
    >  當客戶端啓動後,它建立一個匿名的惟一的回調隊列
    >  對一個RPC請求, 客戶端發送一個消息包含兩個屬性: reply_to (用來設置回調隊列)和 correlation_id(用來爲每一個請求設置一個惟一標識)
    >  請求發送到 rpc_queue隊列
    >  RPC worker( 服務端) 在那個隊列中等待請求,當一個請求出現後,服務端就執行一個job並將結果消息發送給客戶端,使用reply_to字段中的隊列
    >  客戶端在callback 隊列中等待數據, 當一個消息出現後,檢查這個correlation_id屬性,若是和請求中的值匹配將返回給應用

RPC Demo02

處理方法描述:發送端在發送信息前,產生一個接收消息的臨時隊列,該隊列用來接收返回的結果。其實在這裏接收端、發送端 
    的概念已經比較模糊了,由於發送端也一樣要接收消息,接收端一樣也要發送消息,因此這裏筆者使用另外的示例來演示這一過程。
示例內容:假設有一個控制中心和一個計算節點,控制中心會將一個天然數N發送給計算節點,計算節點將N值加1後,返回給控 
    制中心。這裏用center.py模擬控制中心,compute.py模擬計算節點。

Client

 1 import pika
 2 
 3 class Center(object):
 4     def __init__(self):
 5         username = "shiwei"
 6         pwd = 'shiwei666666'
 7         user_pwd = pika.PlainCredentials(username, pwd)
 8 
 9         # 建立鏈接
10         self.conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
11 
12         self.channel = self.conn.channel()
13         # 定義接收返回消息的隊列
14         result = self.channel.queue_declare(exclusive=True,queue="",)
15         self.callback_queue = result.method.queue
16 
17         self.channel.basic_consume(on_message_callback=self.on_response,
18                                    auto_ack=True,
19                                    queue=self.callback_queue)
20 
21     # 定義接收到返回消息的處理方法
22     def on_response(self, ch, method, props, body):
23         self.response = body
24 
25     def request(self, n):
26         self.response = None
27         # 發送計算請求,並聲明返回隊列
28         self.channel.basic_publish(exchange='',
29                                    routing_key='compute_queue',
30                                    properties=pika.BasicProperties(
31                                        reply_to=self.callback_queue,
32                                    ),
33                                    body=str(n))
34         # 接收返回的數據
35         while self.response is None:
36             self.conn.process_data_events()
37         return int(self.response)
38 
39 center = Center()
40 
41 print(" [x] Requesting increase(30)")
42 response = center.request(30)
43 print(" [.] Got %r" % (response,))
Client

Server

 1 import pika
 2 
 3 username = "shiwei"
 4 pwd = 'shiwei666666'
 5 user_pwd = pika.PlainCredentials(username, pwd)
 6 # 建立鏈接
 7 conn = pika.BlockingConnection(pika.ConnectionParameters("localhost", credentials=user_pwd))
 8 channel = conn.channel()
 9 
10 print(' [*] Waiting for n')
11 channel.queue_declare(queue='compute_queue')
12 
13 # 將n值加1
14 def increase(n):
15     return n + 1
16 
17 # 定義接收到消息的處理方法
18 def request(ch, method, properties, body):
19     print(" [.] increase(%s)" % (body,))
20 
21     response = increase(int(body))
22 
23     # 將計算結果發送回控制中心
24     ch.basic_publish(exchange='',
25                      routing_key=properties.reply_to,
26                      body=str(response))
27     ch.basic_ack(delivery_tag=method.delivery_tag)
28 
29 channel.basic_qos(prefetch_count=1)
30 channel.basic_consume(on_message_callback=request,
31                       queue='compute_queue')
32 
33 channel.start_consuming()
Server

參考文章:http://www.rabbitmq.com/tutorials/tutorial-six-python.html

相關文章
相關標籤/搜索