RabbitMQ詳解

http://rabbitmq.mr-ping.com 中文文檔html

什麼叫消息隊列

消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。python

消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。web

爲什麼用消息隊列

從上面的描述中能夠看出消息隊列是一種應用間的異步協做機制,那何時須要使用 MQ 呢?服務器

以常見的訂單系統爲例,用戶點擊【下單】按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發放紅包、發短信通知等。這種場景下就能夠用 MQ ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。負載均衡

RabbitMQ 

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。異步

rabbitMQ是一款基於AMQP協議的消息中間件,它可以在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。並且兩端可使用不一樣的語言編寫,大大提供了靈活性。ide

 

中文文檔函數

rabbitMQ安裝

for Linux:

安裝配置epel源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
安裝erlang
   $ yum -y install erlang
 
安裝RabbitMQ
   $ yum -y install rabbitmq-server
注意:service rabbitmq-server start/stop
for Mac:

bogon:~ yuan$ brew install rabbitmq
bogon:~ yuan$ export PATH=$PATH:/usr/local/sbin
bogon:~ yuan$ rabbitmq-server

rabbitMQ工做模型

簡單模式

示例

# ######################### 生產者 #########################
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

print(" [x] Sent 'Hello World!'")
connection.close()
# ########################## 消費者 ##########################
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
 
channel.basic_consume( callback,
                       queue='hello',
                       no_ack=True)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

相關參數

(1)no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。性能

  • 回調函數中的ch.basic_ack(delivery_tag=method.delivery_tag)
  • basic_comsume中的no_ack=False

消息接收端應該這麼寫:fetch

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.211.55.4'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

(2)  durable  :消息不丟失

# 生產者
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()


# 消費者
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

(3) 消息獲取順序

默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()

# make message persistent
channel.queue_declare(queue='hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

exchange模型

3.1 發佈訂閱

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

exchange type = fanout
# 生產者
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()


# 消費者
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

 3.2 關鍵字發送

 exchange type = direct

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
View Code

 3.3 模糊匹配

 exchange type = topic

發送者路由值              隊列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配

在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示能夠匹配 0 個 或 多個 單詞
  • *  表示只能匹配 一個 單詞

 示例:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

 基於RabbitMQ的RPC

Callback queue 回調隊列

一個客戶端向服務器發送請求,服務器端處理請求後,將其處理結果保存在一個存儲體中。而客戶端爲了得到處理結果,那麼客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to

Correlation id 關聯標識

一個客戶端可能會發送多個請求給服務器,當服務器處理完後,客戶端沒法辨別在回調隊列中的響應具體和那個請求時對應的。爲了處理這種狀況,客戶端在發送每一個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就能夠分辨此響應屬於哪一個請求。


客戶端發送請求:某個應用將請求信息交給客戶端,而後客戶端發送RPC請求,在發送RPC請求到RPC請求隊列時,客戶端至少發送帶有reply_to以及correlation_id兩個屬性的信息

服務器端工做流: 等待接受客戶端發來RPC請求,當請求出現的時候,服務器從RPC請求隊列中取出請求,而後處理後,將響應發送到reply_to指定的回調隊列中

客戶端接受處理結果: 客戶端等待回調隊列中出現響應,當響應出現時,它會根據響應中correlation_id字段的值,將其返回給對應的應用

服務器端

#!/usr/bin/env python
import pika

# 創建鏈接,服務器地址爲localhost,可指定ip地址
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

# 創建會話
channel = connection.channel()

# 聲明RPC請求隊列
channel.queue_declare(queue='rpc_queue')

# 數據處理方法
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

# 對RPC請求隊列中的請求進行處理
def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)

    # 調用數據處理方法
    response = fib(n)

    # 將處理結果(響應)發送到回調隊列
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 負載均衡,同一時刻發送給該服務器的請求不超過一個
channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

客戶端

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        」「」
        客戶端啓動時,建立回調隊列,會開啓會話用於發送RPC請求以及接受響應
        
        「」「
        
        # 創建鏈接,指定服務器的ip地址
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
                
        # 創建一個會話,每一個channel表明一個會話任務
        self.channel = self.connection.channel()
        
        # 聲明回調隊列,再次聲明的緣由是,服務器和客戶端可能前後開啓,該聲明是冪等的,屢次聲明,但只生效一次
        result = self.channel.queue_declare(exclusive=True)
        # 將次隊列指定爲當前客戶端的回調隊列
        self.callback_queue = result.method.queue
        
        # 客戶端訂閱回調隊列,當回調隊列中有響應時,調用`on_response`方法對響應進行處理; 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)


    # 對回調隊列中的響應進行處理的函數
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body


    # 發出RPC請求
    def call(self, n):
    
        # 初始化 response
        self.response = None
        
        #生成correlation_id 
        self.corr_id = str(uuid.uuid4())
        
        # 發送RPC請求內容到RPC請求隊列`rpc_queue`,同時發送的還有`reply_to`和`correlation_id`
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
                                   
        
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

# 創建客戶端
fibonacci_rpc = FibonacciRpcClient()

# 發送RPC請求
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

簡介

RabbitMQ:接受消息再傳遞消息,能夠視爲一個「郵局」。發送者和接受者經過隊列來進行交互,隊列的大小能夠視爲無限的,多個發送者能夠發生給一個隊列,多個接收者也能夠從一個隊列中接受消息。

code

rabbitmq使用的協議是amqp,用於python的推薦客戶端是pika

pip install pika -i https://pypi.douban.com/simple/

生產者:send.py

import pika

# 創建一個鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 鏈接本地的RabbitMQ服務器
channel = connection.channel()  # 得到channel

這裏連接的是本機的,若是想要鏈接其餘機器上的服務器,只要填入地址或主機名便可。

接下來咱們開始發送消息了,注意要確保接受消息的隊列是存在的,不然rabbitmq就丟棄掉該消息

channel.queue_declare(queue='hello')  # 在RabbitMQ中建立hello這個隊列
channel.basic_publish(exchange='',  # 使用默認的exchange來發送消息到隊列
                  routing_key='hello',  # 發送到該隊列 hello 中
                  body='Hello World!')  # 消息內容

connection.close()  # 關閉 同時flush

RabbitMQ默認須要1GB的空閒磁盤空間,不然發送會失敗。

這時已在本地隊列hello中存放了一個消息,若是使用 rabbitmqctl list_queues 可看到

hello 1

說明有一個hello隊列 裏面存放了一個消息

消費者:receive.py

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

仍是先連接到服務器,和以前發送時相同

channel.queue_declare(queue='hello')  # 此處就是聲明瞭 來確保該隊列 hello 存在 能夠屢次聲明 這裏主要是爲了防止接受程序先運行時出錯

def callback(ch, method, properties, body):  # 用於接收到消息後的回調
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定隊列hello的消息
                      no_ack=True)  #在處理完消息後不發送ack給服務器
channel.start_consuming()  # 啓動消息接受 這會進入一個死循環

工做隊列(任務隊列)

工做隊列是用於分發耗時任務給多個工做進程的。不當即作那些耗費資源的任務(須要等待這些任務完成),而是安排這些任務以後執行。例如咱們把task做爲message發送到隊列裏,啓動工做進程來接受並最終執行,且可啓動多個工做進程來工做。這適用於web應用,即不該在一個http請求的處理窗口內完成複雜任務。

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))

分配消息的方式爲 輪詢 即每一個工做進程得到相同的消息數。

消息ack

若是消息分配給某個工做進程,可是該工做進程未處理完成就崩潰了,可能該消息就丟失了,由於rabbitmq一旦把一個消息分發給工做進程,它就把該消息刪掉了。

爲了預防消息丟失,rabbitmq提供了ack,即工做進程在收到消息並處理後,發送ack給rabbitmq,告知rabbitmq這時候能夠把該消息從隊列中刪除了。若是工做進程掛掉 了,rabbitmq沒有收到ack,那麼會把該消息 從新分發給其餘工做進程。不須要設置timeout,即便該任務須要很長時間也能夠處理。

ack默認是開啓的,以前咱們的工做進程顯示指定了no_ack=True

channel.basic_consume(callback, queue='hello')  # 會啓用ack

帶ack的callback:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 發送ack

消息持久化

可是,有時RabbitMQ重啓了,消息也會丟失。可在建立隊列時設置持久化:
(隊列的性質一旦肯定沒法改變)

channel.queue_declare(queue='task_queue', durable=True)

同時在發送消息時也得設置該消息的持久化屬性:

channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))

可是,若是在RabbitMQ剛接收到消息還沒來得及存儲,消息仍是會丟失。同時,RabbitMQ也不是在接受到每一個消息都進行存盤操做。若是還須要更完善的保證,須要使用publisher confirm。

公平的消息分發

輪詢模式的消息分發可能並不公平,例如奇數的消息都是繁重任務的話,某些進程則會一直運行繁 重任務。即便某工做進程上有積壓的消息未處理,如不少都沒發ack,可是RabbitMQ仍是會按照順序發消息給它。能夠在接受進程中加設置:

channel.basic_qos(prefetch_count=1)

告知RabbitMQ,這樣在一個工做進程沒回發ack狀況下是不會再分配消息給它。

羣發

通常狀況下,一條消息是發送給一個工做進程,而後完成,有時想把一條消息同時發送給多個進程:

exchange

發送者是否是直接發送消息到隊列中的,事實上發生者根本不知道消息會發送到那個隊列,發送者只能把消息發送到exchange裏。exchange一方面收生產者的消息,另外一方面把他們推送到隊列中。因此做爲exchange,它須要知道當收到消息時它須要作什麼,是應該把它加到一個特殊的隊列中仍是放到不少的隊列中,或者丟棄。exchange有direct、topic、headers、fanout等種類,而羣發使用的即fanout。以前在發佈消息時,exchange的值爲 '' 即便用default exchange。

channel.exchange_declare(exchange='logs', type='fanout')  # 該exchange會把消息發送給全部它知道的隊列中

臨時隊列

result = channel.queue_declare()  # 建立一個隨機隊列
result = channel.queue_declare(exclusive=True)  # 建立一個隨機隊列,同時在沒有接收者鏈接該隊列後則銷燬它
queue_name = result.method.queue

這樣result.method.queue便是隊列名稱,在發送或接受時便可使用。

綁定exchange 和 隊列

channel.queue_bind(exchange='logs',
               queue='hello')

logs在發送消息時給hello也發一份。

在發送消息是使用剛剛建立的 logs exchange

channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)

路由

以前已經使用過bind,即創建exchange和queue的關係(該隊列對來自該exchange的消息有興趣),bind時可另外指定routing_key選項。

使用direct exchange

將對應routing key的消息發送到綁定相同routing key的隊列中

channel.exchange_declare(exchange='direct_logs',
                     type='direct')

發送函數,發佈不一樣severity的消息:

channel.basic_publish(exchange='direct_logs',
                  routing_key=severity,
                  body=message)

接受函數中綁定對應severity的:

channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key=severity)

使用topic exchange

以前使用的direct exchange 只能綁定一個routing key,可使用這種能夠拿.隔開routing key的topic exchange,例如:

"stock.usd.nyse" "nyse.vmw"

和direct exchange同樣,在接受者那邊綁定的key與發送時指定的routing key相同便可,另外有些特殊的值:

* 表明1個單詞
# 表明0個或多個單詞

若是發送者發出的routing key都是3個部分的,如:celerity.colour.species。

Q1:
*.orange.*  對應的是中間的colour都爲orange的

Q2:
*.*.rabbit  對應的是最後部分的species爲rabbit的
lazy.#      對應的是第一部分是lazy的

qucik.orange.rabbit Q1 Q2均可接收到,quick.orange.fox 只有Q1能接受到,對於lazy.pink.rabbit雖然匹配到了Q2兩次,可是隻會發送一次。若是綁定時直接綁定#,則會收到全部的。

RPC

在遠程機器上運行一個函數而後得到結果。

1、客戶端啓動 同時設置一個臨時隊列用於接受回調,綁定該隊列

 self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)

2、客戶端發送rpc請求,同時附帶reply_to對應回調隊列,correlation_id設置爲每一個請求的惟一id(雖說能夠爲每一次RPC請求都建立一個回調隊列,可是這樣效率不高,若是一個客戶端只使用一個隊列,則須要使用correlation_id來匹配是哪一個請求),以後阻塞在回調隊列直到收到回覆

注意:若是收到了非法的correlation_id直接丟棄便可,由於有這種狀況--服務器已經發了響應可是還沒發ack就掛了,等一會服務器重啓了又會從新處理該任務,又發了一遍相應,可是這時那個請求已經被處理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 發出調用

while self.response is None:  # 這邊就至關於阻塞了
    self.connection.process_data_events()  # 查看回調隊列
return int(self.response)

三、請求會發送到rpc_queue隊列
四、RPC服務器從rpc_queue中取出,執行,發送回覆

channel.basic_consume(on_request, queue='rpc_queue')  # 綁定 等待請求

# 處理以後:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 發送回覆到回調隊列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 發送ack

五、客戶端從回調隊列中取出數據,檢查correlation_id,執行相應操做

if self.corr_id == props.correlation_id:
        self.response = body
相關文章
相關標籤/搜索