rabbitMq API

1.RabbitMQ介紹

1.1python的Queue和RabbitMQ

python消息隊列:
線程queue(同一進程之間進行交互)
進程queue(父子進程進行交互或同一個進程下的多個子進程進行交互)

兩個徹底獨立的python程序:
是不能用上面的queue進行交互的,或者和其餘語言交互的方式有哪些呢?
1.Disk:能夠把數據寫入磁盤
2.Socket通訊
3.消息中間件:RabbitMQ,ZeroMQ,ActiveMQ等。

1.2消息隊列的應用場景

1.2.1異步處理

場景說明:
用戶註冊後,須要發送註冊右鍵和註冊短信。
傳統方式有兩種:
1.串行方式
2.並行方式
串行方式:
將註冊信息寫入數據庫成功後,發送註冊郵件,再發送註冊短信。
以上三個任務所有完成後,返回給客戶端

rabbitMq API

並行方式:
將註冊消息寫入數據庫成功後,發送註冊郵件的同時,發送註冊短信。
以上三個任務完成後,返回給客戶端。
與串行的差異是,並行的方式能夠提升處理的時間。

rabbitMq API

假設三個業務節點每一個使用50ms,不考慮網絡等其餘開銷,串行的方式的時間是150毫秒,並行的時間多是100毫秒。
由於CPU在單位時間內處理的請求數是必定的,假設CPU1秒內吞吐量是100次。
則串行方式1秒內可處理的請求量是7次(1000/150)。
並行方式處理的請求量是10次(1000/100)。
小節:
傳統的方式系統的性能(併發量,吞吐量,響應時間)會有瓶頸。如何解決呢?
引入消息隊列:
改造後的架構以下

rabbitMq API

按照上圖,用戶的響應時間至關於註冊信息寫入數據庫的時間,也就是50毫秒。
註冊郵件,發送短信寫入消息隊列後,直接返回。所以寫入消息隊列的速度很快,基本能夠忽略,所以用戶的響應時間多是50毫秒。
所以架構改變後,系統的吞吐量提升到美妙20QPS。比串行提升了3倍,比並行提升了2倍。

1.2.2應用解耦

場景說明:
用戶下單後,DD系統須要通知KC系統。傳統的作法是,DD系統調用KC系統的接口。

rabbitMq API

傳統方式的缺點:
加入KC系統沒法訪問,則DD庫存將失敗,從而致使DD失敗,兩統耦合。

rabbitMq API

使用消息隊列後:
訂單系統:
用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶下單成功。

庫存系統:
訂閱下單的消息,獲取下單信息,庫存系統根據下單信息,進行庫存操做。

假如:
在下單時庫存系統不能使用,也不會影響下單,由於下單後,訂單系統寫入消息隊列後,就再也不關心其餘的後續操做了。
實現了訂單系統與庫存系統的應用解耦。

1.2.3流量削鋒

流量削鋒也是消息隊列中的經常使用場景,通常在秒殺或團搶活動中使用普遍。
應用場景:
秒殺活動,通常會覺得流量過大,致使流量暴增,應用掛掉。爲解決這個問題,通常須要在應用前端加入消息隊列。
1.能夠控制活動的人數。
2.能夠緩解短期內高流量壓垮應用

rabbitMq API

用戶的請求,服務器接收後,首先寫入消息隊列。
假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面。
秒殺業務根據消息隊列中的請求信息,再作後續處理。

1.2.4消息通信

rabbitMq API

2.RabbitMQ基本示例

2.1單發送單接收 - 生產者消費者模型

rabbitMq API

生產者send.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
# 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份
credentials = pika.PlainCredentials("admin","123456")
# 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.61',credentials=credentials))
# 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接
channel = connection.channel()
# 聲明一個隊列,用於接收消息,隊列名字叫「水許傳」
channel.queue_declare(queue='SH')
# 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),
# 它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據
channel.basic_publish(exchange='', routing_key='SH', body='武松又去打老虎啦2')
print("已經發送了消息")
# 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接
connection.close()
生產者發送完消息,就結束了,就能夠處理其餘程序了

rabbitMq API

消費者receive.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
# 創建與rabbitmq的鏈接
credentials = pika.PlainCredentials("admin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.61',credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="SH")

def callback(ch,method,properties,body):
    print("消費者接收到了任務:%r"%body.decode("utf8"))
    # 有消息來臨,當即執行callback,沒有消息則夯住,等待消息
    # 老百姓開始去郵箱取郵件啦,隊列名字是水許傳
# def basic_consume(self,
#                       queue,
#                       on_message_callback,
#                       auto_ack=False,
#                       exclusive=False,
#                       consumer_tag=None,
#                       arguments=None):
# 這個參數的調用有所改動
# 第一個參數是隊列
# 第二個是回調函數
# 第三個這是auto_ack=True
channel.basic_consume("SH",callback,True)
# 開始消費,接收消息
channel.start_consuming()
消費者會阻塞在這裏,一直等待消息,隊列中有消息了,就執行回調函數

rabbitMq API

rabbitMq API

停掉消費者端,發送多個消息,再次查看

rabbitMq API
rabbitMq API
rabbitMq API
rabbitMq API
rabbitMq API

2.2rabbitmq消息確認之ack

默認狀況下,auto_ack=True,
生產者發送數據給隊列,消費者取出消息後,數據將會被刪除。
特殊狀況,若是消費者處理過程當中,出現錯誤,數據處理沒有完成,那麼該數據將從隊列中丟失。
ACK機制用於保證消費者若是拿了隊列的消息,客戶端處理時出錯了,那麼隊列中仍然存在這個消息,提供下一位消費者繼續取

不確認機制:
即每次消費者接收到數據後,不論是否處理完成,rabbitmq-server都會把這個消息標記完成,從隊列中刪除。
send.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
# 建立憑證,使用rabbitmq用戶名/密碼登陸
credentials = pika.PlainCredentials("admin", "123456")
# 建立鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.61", credentials=credentials))
# 建立頻道
channel = connection.channel()
# 新建一個隊列,用於接收消息
channel.queue_declare(queue="SH2")
# 注意,在rabbitmq中,消息要發送給隊列,必須通過交換(exchange)
# 可使用空字符串交換(exchange="")
# 精確的指定發送給哪一個隊列(routing_key=""),參數body值發送的數據
channel.basic_publish(exchange="",
                      routing_key="SH2",
                      body="SH2 來啦來啦!")
print("消息發送完成")
connection.close()
receive.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
# 拿到消息必須給rabbitmq服務端回覆ack,不然消息不會被刪除。防止客戶端出錯,數據丟失
import pika
# 創建與rabbitmq的鏈接
credentials = pika.PlainCredentials("admin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.61',credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue="SH2")

def callback(ch,method,properties,body):
    print("消費者接收到了任務:%r"%body.decode("utf8"))
    # 演示報錯,消息仍然存在,取消下面的int註釋。
    # int("qwqwqwq")
    # 有消息來臨,當即執行callback,沒有消息則夯住,等待消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume("SH2",callback,False)
# 開始消費,接收消息
channel.start_consuming()

rabbitMq API
rabbitMq API
rabbitMq API

這裏只剩下一個隊列了,是由於剛剛電腦沒電,重啓了,重啓後全部的隊列,消息都沒有了。。這個是剛剛新建的。啦啦啦啦。。。。。。。。。。

rabbitMq API

2.3RabbitMQ消息持久化(durable,properties)

上面咱們看到,我重啓後,隊列所有沒有了。
爲了保證RabbitMQ在退出或者異常狀況下數據沒有丟失,須要將queue,exechange和Message都持久化。
持久化步驟:
1.隊列持久化
每次聲明隊列的時候,都加上durable,注意每一個隊列都要寫,客戶端和服務端聲明的時候都要寫。
# 在管道里聲明
queue channel.queue_declare(queue='hello2', durable=True)

2.消息持久化
發送端發送消息時,加上properties
properties=pika.BasicProperties( 
delivery_mode=2, # 消息持久化 
)
send.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
# 建立憑證,使用rabbitmq用戶名/密碼登陸
credentials = pika.PlainCredentials("admin", "123456")
# 建立鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.61", credentials=credentials))
# 建立頻道
channel = connection.channel()
# 新建一個隊列,用於接收消息
# 默認狀況下,此隊列不支持持久化,若是服務掛掉,數據丟失
# durable=True開啓持久化,必須新開啓一個隊列,本來的隊列已經不支持持久化了
channel.queue_declare(queue="SH3", durable=True)

# delivery_mode=2表明消息持久化
channel.basic_publish(exchange="",
                      routing_key="SH3",
                      body="SH3 持久化 來啦來啦!",
                      # 數據持久化
                      properties=pika.BasicProperties(delivery_mode=2))
print("消息發送完成")
connection.close()

rabbitMq API
rabbitMq API
rabbitMq API
rabbitMq API

2.4RabbitMQ廣播模式(exchange)----消息訂閱發佈 Publish\Subscribe(消息發佈\訂閱)

前面的效果都是一對一發,若是作一個廣播效果可不能夠,這時候須要用到exchange了。
exchange必須明確的知道,收到的消息要發送給誰。
exchange的類型決定了怎麼處理。類型有如下幾種
1.fanout:exchange將消息發送給和該exchange鏈接的全部queue;也就是所謂的廣播模式;此模式下忽略routing_key
2.direct:經過routingKey和exchange決定的那個惟一的queue能夠接收消息,只有routing_key爲"black"時才能夠將其發送到隊列queue_name;
3.topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

exchange type 過濾類型
    fanout = 廣播  
    direct = 組播  
    topic = 規則播   
    header = 略過。。。
注意:廣播是實時的,沒有客戶端接收,消息就沒有了,不會保存下來,不會等待客戶端啓動時接受消息。相似收音機。
因此在發送消息前,要先啓動客戶端,準備接受消息,而後啓動服務端發送消息。

2.4.1fanout純廣播/all

須要queue和exchange綁定,由於消費者不是和exchange直連的,消費者鏈接在queue上,queue綁定在exchange上,消費者只會在queue裏讀取消息。

rabbitMq API

send.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
# 建立憑證,使用rabbitmq用戶名/密碼登陸
credentials = pika.PlainCredentials("admin", "123456")
# 建立鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.61", credentials=credentials))
# 建立頻道
channel = connection.channel()
# 這裏是廣播,不須要聲明queue
channel.exchange_declare(exchange="log",  # 聲明廣播管道
                         exchange_type="fanout")
# delivery_mode=2表明消息持久化
channel.basic_publish(exchange="log",
                      routing_key="",  # 此處爲空,必須有
                      body="fanout 持久化 來啦來啦!")
print("消息發送完成")
connection.close()
client.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
# 創建與rabbitmq的鏈接
credentials = pika.PlainCredentials("admin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.61',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange="log", exchange_type="fanout")
# 不指定queue名字,rabbit會隨機分配一個名字
# exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
result = channel.queue_declare(queue="", exclusive=True)
# 獲取隨機的queue名字
queue_name = result.method.queue
print("random queuename", queue_name)
channel.queue_bind(exchange="log",  # queue綁定到轉發器上
                   queue=queue_name)
print("Waiting for log!")

def callback(ch,method,properties,body):
    print("消費者接收到了任務:%r"%body.decode("utf8"))

# auto_ack設置爲False
channel.basic_consume(queue_name,callback,True)
# 開始消費,接收消息
channel.start_consuming()

rabbitMq API
rabbitMq API
rabbitMq API
rabbitMq API
rabbitMq API

2.4.2direct有選擇的接受消息

路由模式,經過routing_key將消息發送給對應的queue;
以下面這句話,能夠設置exchange爲direct模式,只有routing_key爲"black"時纔將其發送到隊列queue_name;
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')

rabbitMq API

上圖中,Q1和Q2能夠綁定同一個key,如綁定routing_key="KeySame";
那麼受到routing_key爲KeySame的消息時,將會同時發送給Q1和Q2,退化爲廣播模式。
send.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
import sys
# 建立憑證,使用rabbitmq用戶名/密碼登陸
credentials = pika.PlainCredentials("admin", "123456")
# 建立鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.61", credentials=credentials))
# 建立頻道
channel = connection.channel()
# 這裏是廣播,不須要聲明queue
channel.exchange_declare(exchange="direct_logs",  # 聲明廣播管道
                         exchange_type="direct")
# 重要程度級別,這裏默認定義爲 info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange="direct_logs",
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
client.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
import sys
# 創建與rabbitmq的鏈接
credentials = pika.PlainCredentials("admin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.61',credentials=credentials))
channel = connection.channel()
# 生產者和消費者端都要聲明隊列,以排除生成者未啓動,消費者獲取報錯的問題
channel.exchange_declare(exchange="direct_logs", exchange_type="direct")
# 不指定queue名字,rabbit會隨機分配一個名字
# exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
result = channel.queue_declare(queue="", exclusive=True)
# 獲取隨機的queue名字
queue_name = result.method.queue
print("random queuename", queue_name)
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
# 循環列表去綁定
for severity in severities:
    print(severity)
    channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print("Waiting for log!")

def callback(ch,method,properties,body):
    print(" [x] %r:%r" % (method.routing_key, body))

# auto_ack設置爲False
channel.basic_consume(queue_name,callback,True)
# 開始消費,接收消息
channel.start_consuming()

rabbitMq API
rabbitMq API
rabbitMq API
rabbitMq API
rabbitMq API
rabbitMq API

2.4.3topic規則播

topic模式相似於direct模式,只是其中的routing_key變成了一個有「.」分隔的字符串,「.」將字符串分割成幾個單詞,
每一個單詞表明一個條件;

話題類型,能夠根據正則進行更精確的匹配,按照規則過濾。
exchange_type="topic"。
在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入"路由值"和"關鍵字"進行匹配,匹配成功,將數據發送到指定隊列。
#表示能夠匹配0個或多個單詞
*表示只能匹配一個單詞

2.5.關鍵字發佈

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,
即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

rabbitMq API

send.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
import sys
# 建立憑證,使用rabbitmq用戶名/密碼登陸
credentials = pika.PlainCredentials("admin", "123456")
# 建立鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.61", credentials=credentials))
# 建立頻道
channel = connection.channel()
# 這裏是廣播,不須要聲明queue
channel.exchange_declare(exchange='m2', exchange_type='direct')

channel.basic_publish(exchange="m2",
                      routing_key="vita",
                      body="vita send message")
print("消息發送完成")
connection.close()
client0.py 

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
import sys
# 創建與rabbitmq的鏈接
credentials = pika.PlainCredentials("admin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.61',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange="m2", exchange_type="direct")
# 不指定queue名字,rabbit會隨機分配一個名字
# exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
result = channel.queue_declare(queue="", exclusive=True)
# 獲取隨機的queue名字
queue_name = result.method.queue
print("random queuename", queue_name)
# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='vita')
def callback(ch,method,properties,body):
    print(" [x] %r:%r" % (method.routing_key, body))

# auto_ack設置爲False
channel.basic_consume(queue_name,callback,True)
# 開始消費,接收消息
channel.start_consuming()

rabbitMq API

client1.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
import sys
# 創建與rabbitmq的鏈接
credentials = pika.PlainCredentials("admin","123456")
connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.61',credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange="m2", exchange_type="direct")
# 不指定queue名字,rabbit會隨機分配一個名字
# exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
result = channel.queue_declare(queue="", exclusive=True)
# 獲取隨機的queue名字
queue_name = result.method.queue
print("random queuename", queue_name)
# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='lili')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='vita')
def callback(ch,method,properties,body):
    print(" [x] %r:%r" % (method.routing_key, body))

# auto_ack設置爲False
channel.basic_consume(queue_name,callback,True)
# 開始消費,接收消息
channel.start_consuming()

rabbitMq API

send.py中
routing_key="lili",
則只有綁定這個歌routing_key的客戶端能收到消息

rabbitMq API
rabbitMq API

3.RPC

上面的全部例子中,隊列都是單向的,一端發送,一端接收。
若是發送端想接收消費端處理的數據,怎麼辦呢,這就須要RPC(remote procedure call)遠程過程調用了。

rabbitMq API
rabbitMq API

如圖咱們能夠看出生產端client向消費端server請求處理數據,他會經歷以下幾回來完成交互。
1.生產端 生成rpc_queue隊列,這個隊列負責把消息發給消費端。
2.生產端 生成另一個隨機隊列callback_queue,這個隊列是發給消費端,消費者用這個隊列把處理好的數據發送給生產端。
3.生產端 生成一組惟一字符串UUID,發送給消費者,消費者會把這串字符做爲驗證在發給生產者。
4.當消費端處理完數據,發給生產端,會把處理數據與UUID一塊兒經過隨機生產的隊列callback_queue發回給生產端。
5.生產端,會使用while循環 不斷檢測是否有數據,並以這種形式來實現阻塞等待數據,來監聽消費端。
6.生產端獲取數據調用回調函數,回調函數判斷本機的UUID與消費端發回UID是否匹配,因爲消費端,可能有多個,且處理時間不等因此須要判斷,判斷成功賦值數據,while循環就會捕獲到,完成交互。

rabbitMq API

send.py

import queue
import pika
import uuid
import time
class FibRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials("admin", "123456")
        # 1.建立鏈接
        self.connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.61", credentials=credentials))
        self.channel = self.connection.channel()
        # 2.生成隨機queue
        # exclusive = True,消費者端斷開鏈接,隊列刪除
        result = self.channel.queue_declare(queue="", exclusive=True)
        # 隨機獲取queue名字,發送數據給消費端
        self.callback_queue = result.method.queue
        # self.on_response回調函數:只要收到消息就調用這個函數
        # 聲明收到消息後,收queue=self.callback_queue內的消息
        self.channel.basic_consume(self.callback_queue, self.on_response, True)

    def on_response(self, ch, method, props, body):
        """
        收到消息就調用該函數
        :param ch: 管道內存對象
        :param method: 消息發送給哪一個queue
        :param props:
        :param body: 數據對象
        :return:
        """
        # 判斷隨機生成的ID與消費者端發過來的ID是否相同,
        if self.corr_id == props.correlation_id:
            # 將body值給self.response
            print("接收到客戶端發送的信息:", body)
            self.response = body

    def call(self, n):
        # 賦值變量,一個循環值
        self.response = None
        # 隨機生成惟一的字符串
        self.corr_id = str(uuid.uuid4())

        self.channel.basic_publish(exchange="",
                                   routing_key="rpc_queue",
                                   properties=pika.BasicProperties(
                                       # 告訴消費端,執行命令成功後把結果返回給該隊列
                                       reply_to=self.callback_queue,
                                       # 生成UUID,發送給消費端
                                       correlation_id=self.corr_id,
                                   ),
                                   # 發的消息,必須傳入字符串,不能傳數字
                                   body=str(n))
        # 沒有數據就循環接收數據
        while self.response is None:
            # 非阻塞版的start_consuming()
            # 沒有消息不會阻塞
            self.connection.process_data_events()
            print("client does not send data")
            time.sleep(1)
        # 接收到了消費端的結果,返回
        return int(self.response)

fib_rpc = FibRpcClient()
print("[x] Requesting fib(6)")
response = fib_rpc.call(6)
print(" [.] Got %r" % response)

rabbitMq API

receive.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author: vita
import pika
import subprocess
import time
import sys

# 建立憑證,使用rabbitmq用戶名/密碼登陸
credentials = pika.PlainCredentials("admin", "123456")
# 建立鏈接
connection = pika.BlockingConnection(pika.ConnectionParameters("10.0.0.61", credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue="rpc_queue")

def fib(n):
    """
    斐波那契數列
    :param n:
    :return:
    """
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(exchange="",
                     # 數據發送到生產端隨機生成的queue
                     routing_key=props.reply_to,
                     # 同時把correlation_id值設置爲生產端傳過來的值。
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id,
                     ),
                     # 把fib()的結果返回給生產端
                     body=str(response))
    # 確保任務完成
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume("rpc_queue", on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()

rabbitMq API

相關文章
相關標籤/搜索