一、爲何用消息隊列html
舉例:前端
好比在一個企業裏,技術老大接到boss的任務,技術老大把這個任務拆分紅多個小任務,完成全部的小任務就算搞定整個任務了。 那麼在執行這些小任務的時候,可能有一個環節很費時間,而且優先級很低,推遲完成也不影響整個任務運轉,那麼技術老大就會將這個很費時間,且不重要的任務,丟給他的小弟去解決,本身繼續完成其餘任務。
轉化爲計算機思想:java
那個技術老大就是一個 程序系統,那個小弟就是消息隊列。 當程序系統發現某些任務耗費時間且優先級較低,遲點完成也不影響整個任務,就把這個任務丟給消息隊列。
場景:python
在程序系統中,例如外賣系統,訂單系統,庫存系統,優先級較高 發紅包,發郵件,發短信,app消息推送等任務優先級很低,很適合交給消息隊列去處理,以便於程序系統更快的處理其餘請求。
消息隊列工做流程:web
消息隊列通常有三個角色:
隊列服務端
隊列生產者
隊列消費者
消息隊列工做流程就如同一個流水線,有一個輸送帶,有產品加工,一個打包產品
輸送帶 就是 不停運轉的消息隊列服務端
加工產品的 就是 隊列生產者
在傳輸帶結尾打包產品的 就是 隊列消費者
隊列產品:正則表達式
RabbitMQ Erlang編寫的消息隊列產品,企業級消息隊列軟件,支持消息負載均衡,數據持久化等 ZeroMQ saltstack軟件使用此消息,速度最快 Redis key-value的系統,也支持隊列數據結構,輕量級消息隊列 Kafka 由Scala編寫,目標是爲處理實時數據提供一個統1、高通量、低等待的平臺
一個app系統消息隊列工做流程:centos
消費者,一個後臺進程,不斷的去檢測消息隊列中是否有消息,有消息就取走,開啓新線程去處理業務,若是沒有一會再來
二、消息隊列的做用緩存
1)程序解耦
容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。
2)冗餘
消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。
許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
3)峯值處理能力
(大白話,就是原本公司業務只須要5臺機器,可是臨時的秒殺活動,5臺機器確定受不了這個壓力,咱們又不可能將總體服務器架構提高到10臺,那在秒殺活動後,機器不就浪費了嗎?所以引入消息隊列)
在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。
若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。
使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
4)可恢復性
系統的一部分組件失效時,不會影響到整個系統。
消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
5)順序保證
在大多使用場景下,數據處理的順序都很重要。
大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)
6)緩衝
有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。
7)異步通訊
不少時候,用戶不想也不須要當即處理消息。好比發紅包,發短信等流程。
消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。安全
一、你瞭解的消息隊列服務器
生活裏的消息隊列,如同郵局的郵箱,若是沒郵箱的話,發郵件必須找到收郵件那我的,遞給他,纔算完成,那這個任務會處理的很麻煩,很慢,效率很低。
可是若是有了郵箱,郵件直接丟給郵箱,用戶只須要去郵箱裏面去找,有沒有郵件,有就拿走,沒有就下次再來,這樣能夠極大的提高郵件收發效率!
rabbitmq是一個消息代理,它接收和轉發消息,能夠理解爲是生活中的郵局。
你能夠將郵件放在郵箱裏,你能夠肯定有郵遞員會發送郵件給收件人。
歸納:rabbitmq是接收,存儲,轉發數據的。
官方教程:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。
消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。
二、公司什麼狀況下會用消息隊列
2.一、電商訂單
想必你們都點過外賣,點擊下單後的業務邏輯可能包括:檢查庫存、生成單據、發紅包、短信通知等,若是這些業務同步執行,完成下單率會很是低,如發紅包,短信通知等沒必要要的流程,異步執行便可。
此時使用MQ,能夠在核心流程(扣減庫存、生成訂單記錄)等完成後發送消息到MQ,快速結束本次流程。消費者拉取MQ消息時,發現紅包、短信等消息時,再進行處理。
場景:雙11是購物狂節,用戶下單後,訂單系統須要通知庫存系統,傳統的作法就是訂單系統調用庫存系統的接口。
這樣作有以下缺點:
引入消息隊列,以下圖:
訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
庫存系統:訂閱下單的消息,獲取下單消息,進行庫操做。就算庫存系統出現故障,消息隊列也能保證消息的可靠投遞,不會致使消息丟失。
2.二、秒殺活動
流量削峯通常在秒殺活動中應用普遍。
場景:秒殺活動,通常會由於流量過大,致使應用掛掉,爲了解決這個問題,通常在應用前端加入消息隊列。
做用:
用戶的請求,服務器接收到以後,寫入消息隊列,超過定義的閾值就直接丟棄請求,或者跳轉錯誤頁面。
業務系統取出隊列中的消息,再作後續處理。
3.1 yum安裝
下載 centos 源
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.cloud.tencent.com/repo/centos7_base.repo
下載 epel 源
wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo
清空yum緩存並生成新的yum緩存
yum clean all yum makecache
安裝 erlang
yum -y install erlang
安裝 RabbitMQ
yum -y install rabbitmq-server
3.2 rabbitmq 相關命令
# 啓動/中止/重啓/查看狀態 rabbitmq-server(無用戶名密碼): systemctl start/stop/restart/status rabbitmq-server
設置rabbitmq帳號密碼,以及角色權限設置 # 新建用戶 rabbitmqctl add_user {用戶名} {密碼} # 示例:設置新用戶wll 密碼123 sudo rabbitmqctl add_user wll 123 # 爲用戶設置權限 rabbitmqctl set_user_tags {用戶名} {權限} # 示例:設置用戶wll爲administrator角色 sudo rabbitmqctl set_user_tags wll administrator # 設置權限,容許對全部的隊列都有權限 # 對何種資源具備配置、寫、讀的權限經過正則表達式來匹配,具體命令語法:rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read> sudo rabbitmqctl set_permissions -p "/" wll ".*" ".*" ".*" # 重啓服務生效設置 service rabbitmq-server restart # 查看用戶列表 rabbitmqctl list_users # 刪除用戶 rabbitmqctl delete_user {用戶名} # 修改用戶的密碼 rabbitmqctl change_password {用戶名} {新密碼} # 添加 Virtual Hosts rabbitmqctl add_vhost <vhost> # 查看 Virtual Hosts 列表 rabbitmqctl list_vhosts # 刪除 Virtual Hosts rabbitmqctl delete_vhost <vhost> # 使用戶user1具備 vhost1 這個 Virtual Host 中全部資源的配置、寫、讀權限以便管理其中的資源 rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*' # 查看權限 rabbitmqctl list_user_permissions user1 rabbitmqctl list_permissions -p vhost1 # 清除用戶 username 對 VHostPath 資源的全部權限 rabbitmqctl clear_permissions -p <VHostPath> <username>
# 開啓web界面rabbitmq rabbitmq-plugins enable rabbitmq_management # 訪問web界面 http://server-name:15672/
# 查看全部的 exchange rabbitmqctl list_exchanges # 查看全部的 queue rabbitmqctl list_queues # 查看全部的綁定(exchange和queue的綁定信息) rabbitmqctl list_bindings # 查看消息確認信息 rabbitmqctl list_queues name messages_ready messages_unacknowledged # 查看RabbitMQ狀態,包括版本號等信息 rabbitmqctl status
rabbitmq 清空隊列命令 # 清空隊列前必須提早關閉應用,不然不能清除 rabbitmqctl stop_app # 清空隊列 rabbitmqctl reset # 啓動應用 rabbitmqctl start_app # 此時查看隊列,可看到 listing 及 queues都是空的 rabbitmqctl list_queues
AMQP:AMQP協議是一個高級抽象層消息通訊協議,RabbitMQ是AMQP協議的實現。
它主要包括如下組件: 1.Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。 2.Virtual Host: 實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host。 3.Exchange: 接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。 4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。 5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。 6.Binding: Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。 7.Connection: 鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。 8.Channel: 信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。 9.Command: AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。
// rabbitmq官方推薦的python客戶端pika模塊
pip3 install pika
生產者消費者模型,以下圖:
P 是生產者,C是消費者,中間hello是消息隊列,能夠有多個P、多個C。
P發送消息給hello隊列,C消費者從隊列中獲取消息,默認輪詢方式。
生產者send.py
咱們的第一個程序send.py將向隊列發送一條消息。咱們須要作的第一件事是創建與RabbitMQ服務器的鏈接。
#!/usr/bin/env python import pika username = "wll" # rabbitmq用戶名 pwd = "123" # rabbitmq用戶名對應的密碼 vhost = "vh1" # virtual host ip_addr = "10.10.10.80" # rabbitmq服務地址 port = 5672 # rabbitmq服務端口 # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials(username, pwd) # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, virtual_host=vhost, credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 聲明一個隊列,用於接收消息,隊列名字叫"balance" channel.queue_declare(queue='balance') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body指發送的數據 channel.basic_publish(exchange='', routing_key='balance', body=b'hello world!') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close()
消費者receive.py
能夠同時存在多個消費者,等待接收隊列的消息,默認是輪詢的方式分配消息。能夠運行屢次,運行多個消費者。
import pika
# 創建與rabbitmq的鏈接 credentials = pika.PlainCredentials("wll", "123") connection = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.80', vitual_host='vh1', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="balance") def callback(ch, method, properties, body): print("消費者接收到了任務:%r"%body.decode("utf-8"))
# 有消息來臨,當即執行callbak,沒有消息則夯住,等待消息
# 老百姓開始去郵箱取郵件啦,隊列名字是balance channel.basic_consume(queue="balance", on_message_callback=callback, auto_ack=True)
# 開始消費,接收消息 channel.start_consuming()
練習:
分別啓動生產者、兩個消費者,往隊列發送數據,查看消費者的結果!
使用場景:一個發送端,多個接收端,如分佈式的任務派發。爲了保證消息發送的可靠性,不丟失消息,使消息持久化了。同時爲了防止接收端在處理消息時down掉,只有在消息處理完成後才發送ack消息。
7.1 rabbitmq消息確認之ack
官網資料:http://www.rabbitmq.com/tutorials/tutorial-two-python.html
默認狀況下,生產者發送數據給隊列,消費者取出消息後,數據將被清除。
特殊狀況,若是消費者處理過程當中,出現錯誤,數據處理沒有完成,那麼,這段數據將從隊列丟失。
7.2 no-ack 機制
不確認機制也就是說每次消費者接收到數據後,無論是否處理完畢,rabbitmq-server都會把這個消息標記完成,從隊列中刪除。
7.3 ack 機制
ACK機制用於保證消費者若是拿了隊列的消息,客戶端
處理時出錯了,那麼隊列中仍然還存在這個消息,提供下一位消費者繼續取。
流程:
1.生產者無須變更,發送消息 2.消費者若是 auto_ack=True,數據消費後若是出錯就會丟失;反之 auto_ack=False,數據消費若是出錯,數據也不會丟失 3.ack機制在消費者代碼中演示
生產者 send.py(只負責發送數據便可,無需變更)
#!/usr/bin/env python import pika username = "wll" # rabbitmq用戶名 pwd = "123" # rabbitmq用戶名對應的密碼 vhost = "vh1" # virtual host ip_addr = "10.10.10.80" # rabbitmq服務地址 port = 5672 # rabbitmq服務端口 # 建立憑證,使用rabbitmq用戶密碼登陸 # 去郵局取郵件,必須得驗證身份 credentials = pika.PlainCredentials(username, pwd) # 新建鏈接,這裏localhost能夠更換爲服務器ip # 找到這個郵局,等於鏈接上服務器 connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, virtual_host=vhost, credentials=credentials)) # 建立頻道 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接 channel = connection.channel() # 聲明一個隊列,用於接收消息,隊列名字叫"balance" channel.queue_declare(queue='balance') # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body指發送的數據 channel.basic_publish(exchange='', routing_key='balance', body=b'hello world!') print("已經發送了消息") # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接 connection.close()
消費者 receive.py(給與 ack 回覆,拿到消息必須給rabbitmq服務端回覆ack信息,不然消息不會被刪除,防止客戶端出錯,數據丟失)
import pika # 創建與rabbitmq的鏈接 credentials = pika.PlainCredentials("wll", "123") connection = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.80', vitual_host='vh1', credentials=credentials)) channel = connection.channel() channel.queue_declare(queue="balance") def callback(ch, method, properties, body): print("消費者接收到了任務:%r"%body.decode("utf-8")) # 我告訴rabbitmq服務端,我已經取走了消息 # 回覆方式在這 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉no_ack,表明給與服務端ack回覆,確認給與回覆 channel.basic_consume(queue="balance", on_message_callback=callback, auto_ack=False) # 開始消費,接收消息 channel.start_consuming()
7.4 消息持久化
演示 1.執行生產者,向隊列寫入數據,產生一個新隊列queue 2.重啓服務端,隊列丟失 3.開啓生產者數據持久化後,重啓rabbitmq,隊列不丟失 4.依舊能夠讀取數據
消息的可靠性是RabbitMQ的一大特點,那麼RabbitMQ是如何保證消息可靠性的呢——消息持久化。 爲了保證RabbitMQ在退出或者crash等異常狀況下數據沒有丟失,須要將queue,exchange和Message都持久化。
生產者 send.py
import pika import time username = 'wll' pwd = '123' ip_addr = '10.10.10.80' vhost = 'vh1' port_num = 5672 # 無密碼 # connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr) # 有密碼 credentials = pika.PlainCredentials(username, pwd) connection = pika.BlockingConnection(pika.ConnectionParameters(ip_addr, virtual_host=vhost, credentials=credentials)) channel = connection.channel() # 建立一個名爲 shop 的隊列 # 默認此隊列不支持持久化,若是服務掛掉,數據丟失 # durable=True 開啓持久化,必須新開啓一個隊列,本來的隊列已經不支持持久化了 ''' 實現rabbitmq持久化條件:delivery_mode=2 使用 durable=True 聲明 queue是持久化 ''' channel.queue_declare(queue='shop', durable=True) for i in range(100): message_str = 'Hello%s!' % i channel.basic_publish( exchange='', routing_key='shop', body=message_str.encode("utf-8"), properties=pika.BasicProperties( delivery_mode=2 # 表明消息是持久的 ) ) print("Sent 'Hello%s!'" % i) time.sleep(0.5) connection.close()
消費者 receive.py
import pika # 創建與rabbitmq的鏈接 credentials = pika.PlainCredentials("wll", "123") connection = pika.BlockingConnection(pika.ConnectionParameters('10.10.10.80', virtual_host='vh1', credentials=credentials)) channel = connection.channel() # 確保隊列持久化 channel.queue_declare(queue="shop", durable=True) ''' 必須確保給與服務器消息回覆,表明我已經消費了數據,不然數據一直持久化,不會消失 ''' def callback(ch, method, properties, body): print("消費者接收到了任務:%r" % body.decode("utf-8")) # 模擬代碼報錯 int('error') # 此處報錯,沒有給予回覆,保證客戶端掛掉,數據不丟失 # 告訴服務端,我已經取走了數據,不然數據一直存在 ch.basic_ack(delivery_tag=method.delivery_tag) # 關閉auto_ack 表明 給與回覆確認 channel.basic_consume(queue="shop", on_message_callback=callback, auto_ack=False) # 開始消費,接收消息 channel.start_consuming()
7.5 Exchange模型
rabbitmq發送消息首先是發給exchange,而後再經過exchange發送消息給隊列(queue)。
exchange有四種模式:
fanout:exchange將消息發送給和該exchange鏈接的全部queue;也就是所謂的廣播模式;此模式下忽略routing_key;
direct:路由模式,經過routing_key將消息發送給對應的queue;以下面這句便可設置exchange爲direct模式,只有routing_key爲 "black" 時纔將其發送到隊列queue_name;
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')
在上圖中,Q1和Q2能夠綁定同一個key,如綁定routing_key='KeySame',那麼收到routing_key爲KeySame的消息時將會同時發送給Q1和Q2,退化爲廣播模式;
top:topic模式相似於direct模式,只是其中的routing_key變成了一個有 "." 分隔的字符串,"." 將字符串分割成幾個單詞,每一個單詞表明一個條件;
headers:headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配;
官方教程:https://www.rabbitmq.com/tutorials/tutorial-three-python.html
7.6 關鍵字發佈exchange
將一個函數運行在遠程計算機上而且等待獲取那裏的結果,這個稱做遠程過程調用(Remote Procedure Call)或者 RPC。
RPC是一個計算機通訊協議。
比喻:將計算機服務運行理解爲廚師作飯,廚師想作一個小蔥拌豆腐,廚師須要洗小蔥、切豆腐、調汁、涼拌。他一我的完成全部的事,如同古老的集中式應用,一臺計算機作全部的事。
製做小蔥拌豆腐 {
廚師 > 洗小蔥 > 切豆腐 > 涼拌
}
rpc應用場景:而現在,飯店作大了,有錢了,專職分工來幹活,再也不是廚師單打獨鬥,備菜師傅準備小蔥、豆腐,切菜師傅切小蔥、豆腐,廚師只負責調味,完成食品。
製做小蔥拌豆腐 {
備菜師 > 洗菜
切菜師 > 切菜
廚師 > 調味
}
此時一件事好多人在作,廚師就得和其餘人溝通,通知備菜、洗菜師傅的這個動做就是遠程過程調用(RPC)。
這個過程在計算機系統中,一個電商的下單過程,涉及物流、支付、庫存、紅包等多個系統,多個系統又在多個服務器上,由不一樣的技術團隊負責,整個下單過程,須要全部團隊進行遠程調用。
下單 {
庫存 > 減小庫存
支付 > 扣款
紅包 > 減免紅包
物流 > 生成訂單
}
8.1 到底什麼是rpc
rpc指的是在計算機A上的進程,調用另一臺計算機B的進程,A上的進程被掛起,B上的被調用進程開始執行後,產生返回值給A,A繼續執行。
調用方能夠經過參數將信息傳遞給被調用方,然後經過返回結果獲得信息,這個過程對於開發人員來講是透明的。
如同廚師同樣,服務員把菜單給後廚,廚師告訴洗菜人,備菜人,開始工做,完成工做後,整個過程對於服務員是透明的,他徹底不用管後廚是怎麼把菜作好的。
因爲服務在不一樣的機器上,遠程調用必經網絡通訊,調用服務必須寫一坨網絡通訊代碼,很容易出錯且很複雜,所以就出現了RPC框架。
阿里巴巴的 Dubbo java
新浪的 Motan java
谷歌的 gRPC 多語言
Apache thrift 多語言
rpc封裝了數據的序列化,反序列化,以及傳輸協議
8.2 python實現RPC
利用RabbitMQ構建一個RPC系統,包含了客戶端和RPC服務器,依舊使用pika模塊。
8.3 Callback queue 回調隊列
一個客戶端向服務器發送請求,服務器端處理請求後,將其處理結果保存在一個存儲體中。而客戶端爲了得到處理結果,那麼客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to
。
8.4 Correlation id 關聯標識
一個客戶端可能會發送多個請求給服務器,當服務器處理完後,客戶端沒法辨別在回調隊列中的響應具體和哪一個請求是對應的。爲了處理這種狀況,客戶端在發送每一個請求時,同時會附帶一個獨有correlation_id
屬性,這樣客戶端在回調隊列中根據correlation_id
字段的值就能夠分辨此響應屬於哪一個請求。
客戶端發送請求:某個應用將請求信息交給客戶端,而後客戶端發送RPC請求,在發送RPC請求到RPC請求隊列時,客戶端至少發送帶有reply_to以及correlation_id兩個屬性的信息。
服務器端工做流: 等待接受客戶端發來RPC請求,當請求出現的時候,服務器從RPC請求隊列中取出請求,而後處理後,將響應發送到reply_to指定的回調隊列中。
客戶端接受處理結果: 客戶端等待回調隊列中出現響應,當響應出現時,它會根據響應中correlation_id字段的值,將其返回給對應的應用。
過程 1.啓動rpc客戶端,等待接收數據到來,來了以後就進行處理,再將結果丟進隊列 2.啓動rpc服務端,發起請求
rpc_server.py
import pika import uuid class FibonacciRpcClient(object): def __init__(self): # 客戶端啓動時,建立回調隊列,會開啓會話用於發送RPC請求以及接受響應 # 創建鏈接,指定服務器的ip地址 self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.119.10')) # 創建一個會話,每一個channel表明一個會話任務 self.channel = self.connection.channel() # 聲明回調隊列,再次聲明的緣由是,服務器和客戶端可能前後開啓,該聲明是冪等的,屢次聲明,但只生效一次 #exclusive=True 參數是指只對首次聲明它的鏈接可見 #exclusive=True 會在鏈接斷開的時候,自動刪除 result = self.channel.queue_declare(exclusive=True) # 將次隊列指定爲當前客戶端的回調隊列 self.callback_queue = result.method.queue # 客戶端訂閱回調隊列,當回調隊列中有響應時,調用`on_response`方法對響應進行處理; self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) # 對回調隊列中的響應進行處理的函數 def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body # 發出RPC請求 # 例如這裏服務端就是一個切菜師傅,菜切好了,須要傳遞給洗菜師傅,這個過程是發送rpc請求 def call(self, n): # 初始化 response self.response = None # 生成correlation_id 關聯標識,經過python的uuid庫,生成全局惟一標識ID,保證時間空間惟一性 self.corr_id = str(uuid.uuid4()) # 發送RPC請求內容到RPC請求隊列`s14rpc`,同時發送的還有`reply_to`和`correlation_id` self.channel.basic_publish(exchange='', routing_key='s14rpc', properties=pika.BasicProperties( reply_to=self.callback_queue, correlation_id=self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) # 創建客戶端 fibonacci_rpc = FibonacciRpcClient() # 發送RPC請求,丟進rpc隊列,等待客戶端處理完畢,給與響應 print("發送了請求sum(99)") response = fibonacci_rpc.call(99) print("獲得遠程結果響應%r" % response)
rpc_client.py
import pika # 創建鏈接,服務器地址爲localhost,可指定ip地址 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.119.10')) # 創建會話 channel = connection.channel() # 聲明RPC請求隊列 channel.queue_declare(queue='s14rpc') # 模擬一個進程,例如切菜師傅,等着洗菜師傅傳遞數據 def sum(n): n+=100 return n # 對RPC請求隊列中的請求進行處理 def on_request(ch, method, props, body): print(body,type(body)) n = int(body) print(" 正在處理sum(%s)" % n) # 調用數據處理方法 response = sum(n) # 將處理結果(響應)發送到回調隊列 ch.basic_publish(exchange='', # reply_to表明回覆目標 routing_key=props.reply_to, # correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。 properties=pika.BasicProperties(correlation_id= \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) # 負載均衡,同一時刻發送給該服務器的請求不超過一個 channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='s14rpc') print("等待接收rpc請求") #開始消費 channel.start_consuming()