RabbitMQ核心概念篇

 RabbitMQ介紹

1、RabbitMQ使用場景

RabbitMQ他是一個消息中間件,說道消息中間件【最主要的做用:信息的緩衝區】仍是的從應用場景來看下:html

一、系統集成與分佈式系統的設計python

    各類子系統經過消息來對接,這種解決方案也逐步發展成一種架構風格,即「經過消息傳遞的架構」。程序員

    舉個例子:如今醫院有兩個科「看病科」和「住院科」在以前他們之間是沒有任何關係的,若是你在「看病課」看完病後註冊的信息和資料,到住院科後還得從新註冊一遍?那如今要改革,你看完病後能夠直接去住院科那兩個系統之間須要打通怎麼辦?這裏就可使用咱們的消息中間件了。數據庫

二、異步任務處理結果回調的設計json

    舉個例子:記錄日誌,假如須要記錄系統中全部的用戶行爲日誌,若是經過同步的方式記錄日誌勢必會影響系統的響應速度,當咱們將日誌消息發送到消息隊列,記錄日誌的子系統就會經過異步的方式去消費日誌消息。這樣不須要同步的寫入日誌了NICE後端

三、併發請求的壓力高可用性設計服務器

    舉個例子:好比電商的秒殺場景。當某一時刻應用服務器或數據庫服務器收到大量請求,將會出現系統宕機。若是可以將請求轉發到消息隊列,再由服務器去消費這些消息將會使得請求變得平穩,提升系統的可用性。網絡

2、RabbitMQ的介紹

從上面的目前經常使用的消息中間件來講,感受Kafka更好些,沒錯Kafka是大數據時代誕生的消息中間件,但對於目前來講使用最廣的仍是RabbitMQ 若是對Kafka感興趣的能夠看下個人另外一片博客:http://www.cnblogs.com/luotianshuai/p/5206662.html(很是基礎的想看深刻的能夠去官網)架構

**Advanced Message Queuing Protocol (高級消息隊列協議** The Advanced Message Queuing Protocol (AMQP):併發

是一個標準開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了經過網絡發送的字節流的數據格式。所以兼容性很是好,任何實現AMQP協議的程序均可以和與AMQP協議兼容的其餘程序交互,能夠很容易作到跨語言,跨平臺。

3、RabbitMQ和通常的消息傳輸模式:隊列模式&主題模式區別

一、隊列模式

 

一個發佈者發佈消息,下面的接收者按隊列順序接收,好比發佈了10個消息,兩個接收者A,B那就是A,B總共會收到10條消息,不重複。

 

 二、主題模式

 

對於Topic模式,一個發佈者發佈消息,有兩個接收者A,B來訂閱,那麼發佈了10條消息,A,B各收到10條消息。

 

三、RabbitMQ的模式

生產者生產消息後不直接直接發到隊列中,而是發到一個交換空間:Exchange,Exchange會根據Exchange類型和Routing Key來決定發到哪一個隊列中,這個講到發佈訂閱在詳細來看

RabbitMQ安裝配置

1、環境安裝

#安裝epel源[EPEL (Extra Packages for Enterprise Linux,企業版Linux的額外軟件包) 是Fedora小組維護的一個軟件倉庫項目爲RHEL/CentOS提供他們默認不提供的軟件.]
rpm -Uvh https://dl.fedoraproject.org/pub/epel/epel-release-latest-6.noarch.rpm
# 擴展erlang源安裝最新的erlang
rpm -Uvh http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
yum -y install erlang
# 安裝RabbitMQ
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.6.1-1.noarch.rpm

各位看官若是想經過源碼安裝「搜狗一下就知道」這裏不在過多的複述

2、用戶配置

# 啓動服務
/etc/init.d/rabbitmq-server start
# 添加用戶
rabbitmqctl add_user admin admin
# 添加管理員權限
rabbitmqctl set_user_tags admin administrator
# 修改密碼 
abbitmqctl add_user admin youpassword
# 設置權限
rabbitmqctl  set_permissions  -p  '/'  admin '.' '.' '.'

3、啓用WEB管理

# 啓動Web插件
rabbitmq-plugins enable rabbitmq_management
# 刪除guest用戶
rabbitmqctl delete_user guest
# 添加Web訪問權限
"""注意:rabbitmq從3.3.0開始禁止使用guest/guest權限經過除localhost外的訪問。若是想使用guest/guest經過遠程機器訪問,須要在rabbitmq配置文件中(/etc/rabbitmq/rabbitmq.config)中設置loopback_users爲[],配置文件不存在建立便可。"""
# 添加配置
[{rabbit, [{loopback_users, ["admin"]}]}].

RabbitMQ的消息玩法-Python

0、各位看官附帶視頻講解鏈接有須要的能夠看下

Python-RabbitMQ核心玩法

一、最簡單的玩法-生產者消費者

send.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
生產者/發送消息方
"""
import pika

# 遠程主機的RabbitMQ Server設置的用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.31.123', 5672, '/', credentials))
"""
A virtual host holds a bundle of exchanges, queues and bindings. Why would you want multiple virtual hosts? 
Easy. A username in RabbitMQ grants you access to a virtual host…in its entirety.
So the only way to keep group A from accessing group B’s exchanges/queues/bindings/etc. 
is to create a virtual host for A and one for B. Every RabbitMQ server has a default virtual host named 「/」. 
If that’s all you need, you’re ready to roll.

virtualHost is used as a namespace
for AMQP resources (default is \"/\"),so different applications could use multiple virtual hosts on the same AMQP server

[root@localhost ~]# rabbitmqctl list_permissions
Listing permissions in vhost "/" ...
admin    .    .    .
guest    .*    .*    .*
...done.


ConnectionParameters 中的參數:virtual_host 注:
    至關於在rabbitmq層面又加了一層域名空間的限制,每一個域名空間是獨立的有本身的Echange/queues等
    舉個好玩的例子Redis中的db0/1/2相似
    

"""
# 建立通道
channel = connection.channel()
# 聲明隊列hello,RabbitMQ的消息隊列機制若是隊列不存在那麼數據將會被丟掉,下面咱們聲明一個隊列若是不存在建立
channel.queue_declare(queue='hello')
# 給隊列中添加消息
channel.publish(exchange="",
                routing_key="hello",
                body="Hello World")
print("向隊列hello添加數據結束")
# 緩衝區已經flush並且消息已經確認發送到了RabbitMQ中,關閉通道
channel.close()

receive.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
消費者/接收消息方
"""
import pika

# 遠程主機的RabbitMQ Server設置的用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.31.123', 5672, '/', credentials))

# 建立通道
channel = connection.channel()
# 聲明隊列
channel.queue_declare(queue='hello')
"""
你可能會問爲何咱們還要聲明隊列呢? 咱們在以前代碼裏就有了,可是前提是咱們已經知道了咱們已經聲明瞭代碼,可是咱們可能不太肯定
誰先啓動~ So若是大家100%肯定也能夠不用聲明,可是在不少狀況下生產者和消費者都是分離的.因此聲明沒有壞處
"""


# 訂閱的回調函數這個訂閱回調函數是由pika庫來調用的
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 定義通道消費者參數
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 開始接收信息,並進入阻塞狀態,隊列裏有信息纔會調用callback進行處理。按ctrl+c退出。
channel.start_consuming()

 二、升級點玩法-工做隊列

爲了好理解仍是把我們的玩的東西應用到實際生活場景中好比:寫日誌同步慢、或者請求量寫日誌機器扛不住怎麼辦?異步、而且分擔日誌記錄壓力到多臺服務器上.

相似上面的圖的效果:

Work Queue背後的主要思想是避免當即執行資源密集型任務的時,須要等待其餘任務完成。因此咱們把任務安排的晚一些,咱們封裝一個任務到消息中並把它發送到隊列,一個進程運行在後端發送並最終執行這個工做,當你運行多個消費者的時候這個任務將在他們之間共享。

send.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
生產者/發送方
"""
import sys
import pika

# 遠程主機的RabbitMQ Server設置的用戶名密碼
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.31.123', 5672, '/', credentials))

# 建立通道
channel = connection.channel()

# 聲明隊列task_queue,RabbitMQ的消息隊列機制若是隊列不存在那麼數據將會被丟掉,下面咱們聲明一個隊列若是不存在建立
channel.queue_declare(queue='task_queue')

# 在隊列中添加消息
for i in range(100):
    message = '%s Meassage '% i or "Hello World!"
    # 發送消息
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                )

    # 發送消息結束,並關閉通道
    print(" [x] Sent %r" % message)

channel.close()

receive1.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
消費者/接收方
"""
import time
import pika

# 認證信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))
# 創建通道
channel = connection.channel()
# 建立隊列
channel.queue_declare("task_queue")


# 訂閱的回調函數這個訂閱回調函數是由pika庫來調用的
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body.count(b'.'))
    time.sleep(body.count(b'.'))
    print(" [x] Done")

# 定義通道消費者參數
channel.basic_consume(callback,
                      queue="task_queue",
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 開始接收信息,並進入阻塞狀態,隊列裏有信息纔會調用callback進行處理。按ctrl+c退出。
channel.start_consuming()

receive2.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
消費者/接收方
"""
import time
import pika

# 認證信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))
# 創建通道
channel = connection.channel()
# 建立隊列
channel.queue_declare("task_queue")


# 訂閱的回調函數這個訂閱回調函數是由pika庫來調用的
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body.count(b'.'))
    time.sleep(body.count(b'.'))
    print(" [x] Done")

# 定義通道消費者參數
channel.basic_consume(callback,
                      queue="task_queue",
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 開始接收信息,並進入阻塞狀態,隊列裏有信息纔會調用callback進行處理。按ctrl+c退出。
channel.start_consuming()

默認RabbitMQ按照順序發送每個消息,每一個消費者會得到相同的數量消息,這種分發消息的方式稱之爲循環。

三、持久化和公平分發

一、消息持久化

在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其餘意外)的狀況,這種狀況下就可能會致使消息丟失。爲了不這種狀況發生,咱們能夠要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後纔將該消息從Queue中移除;若是RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ鏈接斷開,則RabbitMQ會將該消息發送給其餘消費者(若是存在多個消費者)進行處理。這裏不存在timeout概念,一個消費者處理消息時間再長也不會致使該消息被髮送給其餘消費者,除非它的RabbitMQ鏈接斷開。 這裏會產生另一個問題,若是咱們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會致使嚴重的bug——Queue中堆積的消息會愈來愈多;消費者重啓後會重複消費這些消息並重復執行業務邏輯…千面是由於咱們在消費者端標記了ACK=True關閉了它們,若是你沒有增長ACK=True或者沒有回執就會出現這個問題

生產者須要在發送消息的時候標註屬性爲持久化

# 在隊列中添加消息
for i in range(100):
    message = '%s Meassage '% i or "Hello World!"
    # 發送消息
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(delivery_mode=2, ))  # 標記屬性消息爲持久化消息須要客戶端應答

    # 發送消息結束,並關閉通道
    print(" [x] Sent %r" % message)

消費者須要發送消息回執

# 訂閱回調函數,這個訂閱回調函數是由pika庫來調用
def callback(ch, method, properties, body):
    """
    :param ch: 通道對象
    :param method: 消息方法
    :param properties: 
    :param body: 消息內容
    :return: None
    """
    print(" [x] Received %r" % (body,))
    time.sleep(2)
    print(" [x] Done")
    # 發送消息確認,確認交易標識符
    ch.basic_ack(delivery_tag=method.delivery_tag)

咱們能夠經過命令查看那些消費者沒有回覆ack確認

# Linux
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# Windows
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

二、隊列持久化

若是咱們但願即便在RabbitMQ服務重啓的狀況下,也不會丟失消息,咱們能夠將Queue與Message都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是咱們須要對這種小几率事件也要管理起來,那麼咱們要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此這裏將不講解RabbitMQ相關的事務。 這裏咱們須要修改下生產者和消費者設置RabbitMQ消息的持久化**[生產者/消費者]都須要配置**

channel.queue_declare(queue='task_queue', durable=True)  # 隊列持久化

三、公平分發

默認狀況下RabitMQ會把隊列裏面的消息當即發送到消費者,不管該消費者有多少消息沒有應答,也就是說即便發現消費者來不及處理,新的消費者加入進來也沒有辦法處理已經堆積的消息,由於那些消息已經被髮送給老消費者了。相似下面的

在消費者中增長:`channel.basic_qos(prefetch_count=1)`

prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個消息,即一旦有N個消息尚未ack,則該consumer將block掉,直到有消息ack。 這樣作的好處是,若是系統處於高峯期,消費者來不及處理,消息會堆積在隊列中,新啓動的消費者能夠立刻從隊列中取到消息開始工做。

工做過程以下:

1. 消費者1接收到消息後處理完畢發送了ack並接收新的消息並處理

2. 消費者2接收到消息後處理完畢發送了ack並接收新的消息並處理

3. 消費者3接收到消息後一直處於消息中並無發送ack不在接收消息一直等到消費者3處理完畢後發送ACK後再接收新消息

發佈與訂閱-高級玩法

發佈與訂閱這裏一樣是拿兩個好玩的實際例子:咱們來寫一個日誌系統

在前面咱們學了work Queue它主要是把每一個任務分給一個worker[工做者]接下來咱們要玩些不一樣的,把消息發多個消費者(不一樣的隊列中). 這個模式稱之爲「發佈訂閱」

舉個例子咱們將建立一個簡單的日誌系統,包含兩個程序第一個是用來發送日誌,第二個是用來接收日誌,接收日誌的程序每個副本都將收到消息,**這樣咱們能夠一個接收器用來寫入磁盤,一個接收器用來輸入到日誌~ cool~**

Exchanges可用的類型不多:**direct, topic, headers, fanout**---4種,咱們先看最後一種

 

一、fanout模式

模式特色:

  • 能夠理解他是一個廣播模式
  • 不須要routing key它的消息發送時經過Exchange binding進行路由的~~在這個模式下routing key失去做用
  • 這種模式須要提早將Exchange與Queue進行綁定,一個Exchange能夠綁定多個Queue,一個Queue能夠同多個Exchange進行綁定
  • 若是接收到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。

send.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
import sys
import time
import pika

# 遠程主機的RabbitMQ Server設置的用戶名密碼
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))

# 建立通道
channel = connection.channel()

# 聲明Exchanges
channel.exchange_declare(exchange="logs",
                         exchange_type="fanout")
"""
這裏能夠看到咱們創建鏈接後,就聲明瞭Exchange,由於把消息發送到一個不存在的Exchange是不容許的,
若是沒有消費者綁定這個,Exchange消息將會被丟棄這是能夠的由於沒有消費者
"""
# 添加消息
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange="logs",  # 將消息發送至Exchange爲logs綁定的隊列中
                      routing_key="",
                      body=message,)

print(" [x] Sent %r" % message)
# 關閉通道
connection.close()

receive.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
import pika

# 鏈接RabbitMQ驗證信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))

# 創建通道
channel = connection.channel()

# Bindings Exchanges 接收消息
channel.exchange_declare(exchange='logs',
                         exchange_type="fanout")
# 使用隨機隊列/並標註消費者斷開鏈接後刪除隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# Queue 與 Exchanges綁定
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


# 定義回調函數這個回調函數將被pika庫調用
def callback(ch, method, properties, body):
    print(" [x] %r" % body)

# 定義接收消息屬性
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

# 開始接收消息
channel.start_consuming()

測試:那如今我就能夠把信息分開來記錄了

如今若是你想把日誌存入文件

python fanout_receive1.py > fanout_receive_log.txt

而且想再屏幕也輸出,在打開一個窗口

python fanout_receive1.py

~~ Cool~

概念解釋

binding

當咱們建立了Exchanges和(QUEUE)隊列後,咱們須要告訴Exchange發送到們的Queue隊列中,所須要須要把Exchange和隊列(Queue)進行綁定,

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

二、Direct 模式

任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue

1. 通常狀況可使用rabbitMQ自帶的Exchange:」」 (該Exchange的名字爲空字符串), 也能夠自定義Exchange

2. 這種模式下不須要將Exchange進行任何綁定(bind)操做。固然也能夠進行綁定。能夠將不一樣的routing_key與不一樣的queue進行綁定,不一樣的queue與不一樣exchange進行綁定

3. 消息傳遞時須要一個「routing_key」

4. 若是消息中不存在routing_key中綁定的隊列名,則該消息會被拋棄。

若是一個exchange 聲明爲direct,而且bind中指定了routing_key,那麼發送消息時須要同時指明該exchange和routing_key.

簡而言之就是:生產者生成消息發送給Exchange, Exchange根據Exchange類型和basic_publish中的routing_key進行消息發送 消費者:訂閱Exchange並根據Exchange類型和binding key(bindings 中的routing key) ,若是生產者和訂閱者的routing_key相同,Exchange就會路由到那個隊列。

老規矩仍是經過實例來講:

在上面的文檔中咱們建立了一個簡單的日誌系統,咱們把消息發給全部的訂閱者 在下面的內容中將把特定的消息發給特定的訂閱者,舉個例子來講,把error級別的報警寫如文件,並把全部的報警打印到屏幕中,進行了路由的規則相似下面的架構

這裏也要注意一個routing key 是能夠綁定多個隊列的

在上面咱們已經建立過bindings了相似下面

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

消費者端:Bindings能夠增長routing_key 這裏不要和basic_publish中的參數弄混了,咱們給它稱之爲**binding key**

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

 binding key的含義依賴於Exchange類型,fanout exchanges類型只是忽略了它

emit_log_direct.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
生產者/發送方
"""
import pika
import sys

# 添加RabbitMQ Server端認證信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))

# 建立通道
channel = connection.channel()
# 聲明Exchange 且類型爲 direct
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

# 從調用參數中或去類型
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
# 發送消息指定Exchange爲direct_logs,routing_key 爲調用參數獲取的值
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
# 打印信息
print(" [x] Sent %r:%r" % (severity, message))
# 關閉通道
connection.close()

receive_logs_direct.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
消費者/接收方
"""
import pika
import sys

# 添加RabbitMQ Server端認證信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))

# 建立通道
channel = connection.channel()
# binding Exchange 爲direct_logs, 且Exchange類型爲direct
channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

# 生成隨機接收隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 接收監聽參數
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

# 從循環參數中獲取routing_key,並綁定
for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


# 定義回調參數
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

# 定義通道參數
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
# 開始接收消息
channel.start_consuming()

測試接收error 寫入到文件

python receive_logs_direct.py error > logs_from_rabbit.log

測試接收全部級別的報警輸出值公屏

python receive_logs_direct.py info warning error

測試發送消息

python emit_log_direct.py error "Run. Run. Or it will explode."

三、topic類型

前面講到direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。

topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但這裏的匹配規則有些不一樣,

它約定:

  • routing key爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」
  • binding key與routing key同樣也是句點號「. 」分隔的字符串
  • binding key中能夠存在兩種特殊字符「*」與「#」,用於作模糊匹配,其中「*」用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)

以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey。

RPC

MQ自己是基於異步的消息處理,前面的示例中全部的生產者(P)將消息發送到RabbitMQ後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。

但實際的應用場景中,咱們極可能須要一些同步處理,須要同步等待服務端將個人消息處理完成後再進行下一步處理。這至關於RPC(Remote Procedure Call,遠程過程調用)。在RabbitMQ中也支持RPC。

 

一、RabbitMQ實現RPC機制是

  • 客戶端發送請求(消息)時,在消息的屬性(MessageProperties,在AMQP協議中定義了14種properties,這些屬性會隨着消息一塊兒發送)中設置兩個值replyTo(一個Queue名稱,用於告訴服務器處理完成後將通知個人消息發送到這個Queue中)和correlationId(這次請求的標識號,服務器處理完成後須要將此屬性返還,客戶端將根據這個id瞭解哪條請求被成功執行了或執行失敗)
  • 服務器端收到消息並處理
  • 服務器端處理完消息後,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性
  • 客戶端以前已訂閱replyTo指定的Queue,從中收到服務器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理

雖然RPC在計算中是一個很常見的模式,但它常常受到批評。當程序員不知道函數調用是本地的仍是慢RPC時,問題就出現了。這樣的混淆致使了不可預測的系統,並增長了調試的沒必要要的複雜性。

與簡化軟件不一樣,誤用的RPC可能致使沒法維護的面代碼。 >

記住上面幾點問題,考慮下面幾個建議

  • - 確保調用的函數是本地的仍是遠程的
  • - 記錄您的系統。明確組件之間的依賴關係
  • - 處理錯誤案例。當RPC服務器關閉很長時間時,客戶端應該如何反應?

若是對RPC有不少疑問,若是能夠的話最好使用異步管道 通常來講在RabbitMQ執行RPC是很簡單的,客戶端發送請求服務端響應消息,爲了接收響應,客戶端須要發送一個「回調」隊列地址。

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

AMQP 0-9-1 協議預先定義了14種屬性,除了如下幾種以外經常使用其餘的不常常用

  • - delivery_mode: 標記消息持久化,值爲2的時候爲持久化其餘任何值都是瞬態的
  • - content_type: 用來描述mime-type編碼,舉個例子來講常用JSON編碼的話將此屬性設置爲:application/json.
  • - reply_to:一般用於命名回調隊列
  • - correlation_id:將RCP請求和響應進行關聯的id

Correlation id

上面的描述中咱們建議爲每一個RPC建立一個隊列可是這個很低效,幸運的是咱們有更好的辦法:爲每一個客戶端建立一個隊列 可是這就會觸發一個新的問題,咱們不肯定這個消息是哪一個返回的這裏就用到了 Correlation id 咱們給每一個請求設置一個惟一值,最後當咱們收到消息在這個callback Queue中,咱們查看這個屬性和請求的屬性(Correlation_id)進行匹配若是沒有匹配上,咱們將拒絕這個消息它不是咱們的~

您可能會問,爲何咱們應該忽略回調隊列中的未知消息,而不是錯誤地失敗呢?這是因爲服務器端可能出現競態條件。雖然不太可能,可是RPC服務器可能在發送了答案後纔會死亡,但在發送請求消息以前。

若是發生這種狀況,從新啓動的RPC服務器將再次處理此請求。這就是爲何在客戶端咱們必須優雅地處理重複的響應,而RPC應該是冪等()的。

RPC冪等性:

  •  f(x)=f(f(x))
  • 若是消息具備操做冪等性,也就是一個消息被應用屢次與應用一次產生的效果是同樣的

二、工做機制

- 當客戶端啓動會建立一個匿名回調隊列

- 在RCP請求Client發送兩個屬性:reply_to 標記callback隊列,correlation_id 每一個請求的惟一值,這個請求是被髮送到rpc_queue 中

- 這個RPC worker (aka: server)等待請求在這個rpc_queue中,當請求出現它執行任務並將結果返回給客戶端,發送到那個隊列呢?就是咱們reply_to標記的隊列,客戶端等待數據返回在這個callback隊列,當消息出現 檢查correlation_id 屬性是不是和請求的若是匹配進行相應

rpc_server.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
RPC/Server端
"""
import pika
# 添加認證信息
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))

# 添加一個通道
channel = connection.channel()
# 添加一個隊列,這個隊列在Server就是咱們監聽請求的隊列
channel.queue_declare(queue='rpc_queue')


# 斐波那契計算
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


#  應答函數,它是咱們接受到消息後如處理的函數替代原來的callback
def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     # 返回的隊列,從屬性的reply_to取出來
                     routing_key=props.reply_to,
                     # 添加correlation_id,和Client進行一致性匹配使用的
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     # 相應消息/咱們的處理結果
                     body=str(response))
    # 增長消息回執
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 每次預處理的請求個數/這個請求我沒有處理完不要給我發了
channel.basic_qos(prefetch_count=1)
# 定義接收通道的屬性/定義了callback方法,接收的隊列,返回的隊列在哪裏?on_request 的routing_key=props.reply_to
channel.basic_consume(on_request, queue='rpc_queue')

# 開始接收消息
print(" [x] Awaiting RPC requests")
channel.start_consuming()

這個Server端的代碼是很是簡單的

- 往常同樣咱們先建立鏈接並聲明隊列 - 咱們聲明咱們的斐波那契額函數,它只假設有效的正整數輸入。(不要指望這個可以爲大數據工做,它多是最慢的遞歸實現)。

- 咱們聲明瞭callback在basic_consume,它是RCP核心請求過來執行callback,它來工做併發送相應

- 咱們可能但願運行多個服務器進程。爲了在多個服務器上平均分配負載,咱們須要設置prefetch_count設置

rpc_client.py

# !/usr/bin/env python3.5
# -*- coding:utf-8 -*-
# __author__ == 'LuoTianShuai'
"""
RPC/Client端
"""
import pika
import uuid


# 定義菲波那切數列RPC Client類調用RPC Server
class FibonacciRpcClient(object):
    def __init__(self):
        # 添加認證信息
        credentials = pika.PlainCredentials("admin", "admin")
        self.connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.31.123", 5672, "/", credentials))
        # 添加一個通道
        self.channel = self.connection.channel()
        # 生成一個隨機隊列-定義callback回調隊列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        # 定義回調的通道屬性
        self.channel.basic_consume(self.on_response,  # 回調結果執行完執行的Client端的callback方法
                                   no_ack=True,
                                   queue=self.callback_queue)
        # 這裏注意咱們並無直接阻塞的開始接收消息了

    # Client端 Callback方法
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            # 定義了self.response = body
            self.response = body

    def call(self, n):
        # 定義了一個普通字段
        self.response = None
        # 生成了一個uuid
        self.corr_id = str(uuid.uuid4())
        # 發送一個消息
        self.channel.basic_publish(exchange='',   # 使用默認的Exchange,根據發送的routing_key來選擇隊列
                                   routing_key='rpc_queue',  # 消息發送到rpc_queue隊列中
                                   # 定義屬性
                                   properties=pika.BasicProperties(
                                         reply_to=self.callback_queue,  # Client端定義了回調消息的callback隊列
                                         correlation_id=self.corr_id,  # 惟一值用來作什麼的?request和callback 匹配用
                                         ),
                                   body=str(n))

        # 開始循環 咱們剛纔定義self.response=None當不爲空的時候中止`
        while self.response is None:
            # 非阻塞的接受消息
            self.connection.process_data_events(time_limit=3)
        return int(self.response)

# 實例化對象
fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
# 發送請求計算菲波那切數列 30
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

- 咱們創建一個鏈接,通道,並聲明一個排他的「回調」隊列做爲回覆

- 咱們訂閱這個callback隊列,因此咱們能夠接收RPC Server消息

- 每一個相應回來以後執行很是簡單的on_response方法,每一個相應來後檢查下correlation_id是不是匹配的,保存相應並退出接收循環

- 接下來,咱們定義主調用方法——它執行實際的RPC請求。

- 咱們定義了uuid,並在消息發送時添加了兩個屬性:reply_to、correlation_id

概念彙總

ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket連接,它封裝了socket協議相關部分邏輯。

ConnectionFactory爲Connection的製造工廠。

Channel是咱們與RabbitMQ打交道的最重要的一個接口,咱們大部分的業務操做是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發佈消息等。

exchange

實際上生產者不會直接把消息放到消息隊列裏,實際的狀況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)

queue

Queue(隊列)是RabbitMQ的內部對象,用於存儲消息,用下圖表示。

RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)能夠從Queue中獲取消息並消費。

Message acknowledgment

在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其餘意外)的狀況,這種狀況下就可能會致使消息丟失。爲了不這種狀況發生,咱們能夠要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後纔將該消息從Queue中移除;若是RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ鏈接斷開,則RabbitMQ會將該消息發送給其餘消費者(若是存在多個消費者)進行處理。 這裏不存在timeout概念,一個消費者處理消息時間再長也不會致使該消息被髮送給其餘消費者,除非它的RabbitMQ鏈接斷開。

**這裏會產生另一個問題,若是咱們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會致使嚴重的bug——Queue中堆積的消息會愈來愈多;消費者重啓後會重複消費這些消息並重復執行業務邏輯…**

Message durability 

若是咱們但願即便在RabbitMQ服務重啓的狀況下,也不會丟失消息,咱們能夠將Queue與Message都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是咱們須要對這種小几率事件也要管理起來,那麼咱們要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此這裏將不講解RabbitMQ相關的事務。

Prefetch count

生產者在將消息發送給Exchange的時候,通常會指定一個routing key,來指定這個消息的路由規則,而這個routing key須要與Exchange Type及binding key聯合使用才能最終生效。 在Exchange Type與binding key固定的狀況下(在正常使用時通常這些內容都是固定配置好的),咱們的生產者就能夠在發送消息給Exchange時,經過指定routing key來決定消息流向哪裏。 RabbitMQ爲routing key設定的長度限制爲255 bytes。

Binding

RabbitMQ中經過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。 在綁定(Binding)Exchange與Queue的同時,通常會指定一個binding key;生產者將消息發送給Exchange時,通常會指定一個routing key;當binding key與routing key相匹配時,消息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。

在綁定多個Queue到同一個Exchange的時候,這些Binding容許使用相同的binding key。 binding key 並非在全部狀況下都生效,它依賴於Exchange Type,好比fanout類型的Exchange就會無視binding key,而是將消息路由到全部綁定到該Exchange的Queue。

視頻講解

點我跳轉

相關文章
相關標籤/搜索