01 Rabbitmq

  在程序系統中,例如外賣系統,訂單系統,庫存系統,優先級較高 發紅包,發郵件,發短信,app消息推送等任務優先級很低,很適合交給消息隊列去處理,以便於程序系統更快的處理其餘請求。html

消息隊列工做流程:前端

1 1 消息隊列通常有三個角色:
2 2 隊列服務端
3 3 隊列生產者
4 4 隊列消費者
5 5 消息隊列工做流程就如同一個流水線,有產品加工,一個輸送帶,一個打包產品
6 6 輸送帶就是 不停運轉的消息隊列服務端
7 7 加工產品的就是 隊列生產者
8 8 在傳輸帶結尾打包產品的 就是隊列消費者
View Code

 

隊列產品:java

 1  1 RabbitMQ
 2  2 Erlang編寫的消息隊列產品,企業級消息隊列軟件,支持消息負載均衡,數據持久化等。
 3  3 
 4  4 ZeroMQ 
 5  5 saltstack軟件使用此消息,速度最快。
 6  6 
 7  7 Redis
 8  8 key-value的系統,也支持隊列數據結構,輕量級消息隊列
 9  9 
10 10 Kafka
11 11 由Scala編寫,目標是爲處理實時數據提供一個統1、高通量、低等待的平臺
View Code

 

一個app系統隊列工做流程:python

1 消費者,一個後臺進程,不斷的去檢測消息隊列中是否有消息,有消息就取走,開啓新線程去處理業務,
若是沒有一會再來

消息隊列的做用:web

 1  1 (1)程序解耦
 2  2     容許你獨立的修改或擴展兩邊的處理過程,只要確保它們遵照一樣的接口約束
 3  3 (2)冗餘
 4  4     消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。
 5  5 許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
 6  6 (3)峯值處理能力(削峯)
 7  7     舉例秒殺活動或者搶票.
 8  8     在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見。若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。
 9 10    使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
10 11 (4)可恢復性
11 12     系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
12 13 (5)順序保證
13 14     在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。
14 15 (6)緩衝
15 16     有助於控制和優化數據流通過系統的速度,解決生產消息和消費消息的處理速度不一致的狀況。
16 17 (7)異步通訊
17 18     不少時候,用戶不想也不須要當即處理消息。好比發紅包,發短信等流程。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。
View Code

 

rabbitMQ正則表達式

1、生活中消息隊列的舉例:centos

1 生活裏的消息隊列,如同郵局的郵箱,
2 若是沒郵箱的話,
3 郵件必須找到郵件那我的,遞給他,才玩完成,那這個任務會處理的很麻煩,很慢,效率很低
4 
5 可是若是有了郵箱,
6 郵件直接丟給郵箱,用戶只須要去郵箱裏面去找,有沒有郵件,有就拿走,沒有就下次再來,這樣能夠極大的提高郵件收發效率!
View Code

 

rabbitmq是接收,存儲,轉發數據的.緩存

  消息是應用間傳送數據的通訊方式.消息能夠很是簡單,能夠是字符串,也能夠是序列化之後的複雜數據.消息隊列是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞.消息發佈者只須要把消息發送到MQ中而不用管誰來取,消息使用者只管從MQ取消息而無論誰發佈的.這樣發佈者和消費者都不須要知道對方是否存在.安全

 2、什麼狀況下用到消息隊列?服務器

  1. 電商訂單

    點外賣的業務邏輯可能會包括: 檢查庫存、生成的單據、發優惠紅包、短信通知等。若是這些個業務同一塊執行,完成下單的效率是很是的低的,發紅包、短信通知這些不是完成一個訂單必須的業務,就能夠異步執行。

    異步執行用到MQ,能夠在覈心流程(扣減庫存,生成訂單消息)等完成後發送消息到MQ,快速結束本次流程。消費者拉取MQ消息時,發現紅包、短信等消息時,再進行處理。

場景:雙11是購物狂節,用戶下單後,訂單系統須要通知庫存系統,傳統的作法就是訂單系統調用庫存系統的接口

 

這種作法有一個缺點:

  • 當庫存系統出現故障時,訂單就會失敗。(這樣馬雲將少賺好多好多錢錢。。。。)

  • 訂單系統和庫存系統高耦合.

引入消息隊列

 

  • 訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。

  • 庫存系統:訂閱下單的消息,獲取下單消息,進行庫操做。

  二、秒殺活動

流量削峯通常在秒殺活動中應用普遍 場景:秒殺活動,通常會由於流量過大,致使應用掛掉,爲了解決這個問題,通常在應用前端加入消息隊列。 做用: 1.能夠控制活動人數,超過此必定閥值的訂單直接丟棄(怪不得我一次秒殺都沒搶到過。。。。。wtf???)

2.能夠緩解短期的高流量壓垮應用(應用程序按本身的最大處理能力獲取訂單)

 

 

3.用戶的請求,服務器接收到以後,寫入消息隊列,超過定義的閾值就直接丟棄請求,或者跳轉錯誤頁面。

4.業務系統取出隊列中的消息,再作後續處理。

3、Rabbit MQ安裝

 1 rabbitmq-server服務端
 2 
 3 1.下載centos源
 4 wget -O /etc/yum.repos.d/CentOS-Base.repo   http://mirrors.cloud.tencent.com/repo/centos7_base.repo
 5 2.下載epel源
 6 wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo
 7 3.清空yum緩存而且生成新的yum緩存
 8 yum clean all
 9 yum makecache
10 4.安裝erlang
11    $ yum -y install erlang
12 5.安裝RabbitMQ
13    $ yum -y install rabbitmq-server
14 6.啓動(無用戶名密碼):
15     systemctl start/stop/restart/status rabbitmq-server
16 17 設置rabbitmq帳號密碼,以及角色權限設置
18 
19 # 設置新用戶yugo 密碼123
20 sudo rabbitmqctl add_user yugo 123
21 22 # 設置用戶爲administrator角色
23 sudo rabbitmqctl set_user_tags yugo administrator
24 25 # 設置權限,容許對全部的隊列都有權限
26 對何種資源具備配置、寫、讀的權限經過正則表達式來匹配,具體命令以下:
27 set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
28 
29 sudo rabbitmqctl set_permissions -p "/" yugo ".*" ".*" ".*"
30 31 #重啓服務生效設置
32 service rabbitmq-server start/stop/restart
33 rabbitmq相關命令
34 
35 // 新建用戶
36 rabbitmqctl add_user {用戶名} {密碼}
37 38 // 設置權限
39 rabbitmqctl set_user_tags {用戶名} {權限}
40 41 // 查看用戶列表
42 rabbitmqctl list_users
43 44 // 爲用戶受權
45 添加 Virtual Hosts :    
46 rabbitmqctl add_vhost <vhost>    
47 48 // 刪除用戶
49 rabbitmqctl delete_user Username
50 51 // 修改用戶的密碼
52 rabbitmqctl change_password Username Newpassword
53     
54 // 刪除 Virtual Hosts :    
55 rabbitmqctl delete_vhost <vhost>    
56     
57 // 添加 Users :    
58 rabbitmqctl add_user <username> <password>    
59 rabbitmqctl set_user_tags <username> <tag> ...    
60 rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read>    
61     
62 // 刪除 Users :    
63 delete_user <username>   
64 65 // 使用戶user1具備vhost1這個virtual host中全部資源的配置、寫、讀權限以便管理其中的資源
66 rabbitmqctl  set_permissions -p vhost1 user1 '.*' '.*' '.*' 
67 68 // 查看權限
69 rabbitmqctl list_user_permissions user1
70 71 rabbitmqctl list_permissions -p vhost1
72 73 // 清除權限
74 rabbitmqctl clear_permissions [-p VHostPath] User
75 76 //清空隊列步驟
77 rabbitmqctl reset 
78 須要提早關閉應用rabbitmqctl stop_app ,
79 而後再清空隊列,啓動應用
80 rabbitmqctl start_app
81 此時查看隊列rabbitmqctl list_queues
82 83 查看全部的exchange:                              rabbitmqctl list_exchanges
84 查看全部的queue:                                 rabbitmqctl list_queues
85 查看全部的用戶:                                   rabbitmqctl list_users
86 查看全部的綁定(exchange和queue的綁定信息):         rabbitmqctl list_bindings
87 查看消息確認信息:
88 rabbitmqctl list_queues name messages_ready messages_unacknowledged
89 查看RabbitMQ狀態,包括版本號等信息:rabbitmqctl status
90 
91 #開啓web界面rabbitmq
92 rabbitmq-plugins enable rabbitmq_management
93 
94 #訪問web界面
95 http://server-name:15672/
服務端

 Rabbit MQ組件解釋

 1 AMQP協議是一個高級抽象層消息通訊協議,RabbitMQ是AMQP協議的實現。它主要包括如下組件:
 2 1.Server(broker): 接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。
 3 
 4 2.Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個Exchange和Queue,可是權限控制的最小粒度是Virtual Host
 5 
 6 3.Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三種,不一樣類型的Exchange路由的行爲是不同的。
 7 
 8 4.Message Queue:消息隊列,用於存儲還未被消費者消費的消息。
 9 
10 5.Message: 由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。
11 
12 6.Binding:Binding聯繫了Exchange與Message Queue。Exchange在與多個Message Queue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing Key,Exchange根據Routing Key與Exchange Type將Message路由到Message Queue。Binding Key由Consumer在Binding Exchange與Message Queue時指定,而Routing Key由Producer發送Message時指定,二者的匹配方式由Exchange Type決定。 
13 
14 7.Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。
15 
16 8.Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection。
17 
18 9.Command:AMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。
AMQP

 python客戶端

1 // rabbitmq官方推薦的python客戶端pika模塊
2 pip3 install pika

應用場景一、單發送單接收

  生產者消費者模型

P   是生產者
C 是消費者
中間hello是消息隊列
能夠有多個P、多個C

P發送消息給hello隊列,C消費者從隊列中獲取消息,默認輪詢方式

 

 

生產者send.py

咱們的第一個程序send.py將向隊列發送一條消息。咱們須要作的第一件事是創建與RabbitMQ服務器的鏈接。
 1 #!/usr/bin/env python
 2 import pika
 3 # 建立憑證,使用rabbitmq用戶密碼登陸
 4 # 去郵局取郵件,必須得驗證身份
 5 credentials = pika.PlainCredentials("s14","123")
 6 # 新建鏈接,這裏localhost能夠更換爲服務器ip
 7 # 找到這個郵局,等於鏈接上服務器
 8 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
 9 # 建立頻道
10 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接
11 channel = connection.channel()
12 # 聲明一個隊列,用於接收消息,隊列名字叫「水許傳」
13 channel.queue_declare(queue='水許傳')
14 # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據
15 channel.basic_publish(exchange='',
16                       routing_key='水許傳',
17                       body='武松又去打老虎啦2')
18 print("已經發送了消息")
19 # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接
20 connection.close()
send.py

能夠同時存在多個接受者,等待接收隊列的消息,默認是輪訓方式分配消息

接受者receive.py,能夠運行屢次,運行多個消費者

 1 import pika
 2 # 創建與rabbitmq的鏈接
 3 credentials = pika.PlainCredentials("s14","123")
 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
 5 channel = connection.channel()
 6 channel.queue_declare(queue="水許傳")
 7 
 8 def callbak(ch,method,properties,body):
 9     print("消費者接收到了任務:%r"%body.decode("utf8"))
10 # 有消息來臨,當即執行callbak,沒有消息則夯住,等待消息
11 # 老百姓開始去郵箱取郵件啦,隊列名字是水許傳
12 channel.basic_consume(callbak,queue="水許傳",no_ack=True)
13 # 開始消費,接收消息
14 channel.start_consuming()
receive.py

應用場景二、單發送多接收

  使用場景:一個發送端,多個接收端,如分佈式的任務派發。爲了保證消息發送的可靠性,不丟失消息,使消息持久化了。同時爲了防止接收端在處理消息時down掉,只有在消息處理完成後才發送ack消息。

Rabbitmq消息確認之ack

  官網資料:http://www.rabbitmq.com/tutorials/tutorial-two-python.html

  默認狀況下,生產者發送數據給隊列,消費者取出消息後,數據將被清除。

特殊狀況,若是消費者處理過程當中,出現錯誤,數據處理沒有完成,那麼這段數據將從隊列丟失

no-ack機制

  不確認機制也就是說每次消費者接收到數據後,無論是否處理完畢,rabbitmq-server都會把這個消息標記完成,從隊列中刪除

ACK機制

  ACK機制用於保證消費者若是拿了隊列的消息,客戶端處理時出錯了,那麼隊列中仍然還存在這個消息,提供下一位消費者繼續取

流程

1.生產者無須變更,發送消息
2.消費者若是no_ack=True啊,數據消費後若是出錯就會丟失
反之no_ack=False,數據消費若是出錯,數據也不會丟失

3.ack機制在消費者代碼中演示

生產者.py 只負責發送數據便可,無須變更

 1 #!/usr/bin/env python
 2 import pika
 3 # 建立憑證,使用rabbitmq用戶密碼登陸
 4 # 去郵局取郵件,必須得驗證身份
 5 credentials = pika.PlainCredentials("s14","123")
 6 # 新建鏈接,這裏localhost能夠更換爲服務器ip
 7 # 找到這個郵局,等於鏈接上服務器
 8 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
 9 # 建立頻道
10 # 建造一個大郵箱,隸屬於這家郵局的郵箱,就是個鏈接
11 channel = connection.channel()
12 # 新建一個hello隊列,用於接收消息
13 # 這個郵箱能夠收發各個班級的郵件,經過
14 channel.queue_declare(queue='金品沒')
15 # 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據
16 channel.basic_publish(exchange='',
17                       routing_key='金品沒',
18                       body='潘金蓮又出去。。。')
19 print("已經發送了消息")
20 # 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接
21 connection.close() 
View Code

  消費者.py給與ack回覆

  拿到消息必須給rabbitmq服務端回覆ack信息,不然消息不會被刪除,防止客戶端出錯,數據丟失

 1 import pika
 2 
 3 credentials = pika.PlainCredentials("s14","123")
 4 connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.119.10',credentials=credentials))
 5 channel = connection.channel()
 6 
 7 # 聲明一個隊列(建立一個隊列)
 8 channel.queue_declare(queue='金品沒')
 9 
10 def callback(ch, method, properties, body):
11     print("消費者接受到了任務: %r" % body.decode("utf-8"))
12     # int('asdfasdf')
13     # 我告訴rabbitmq服務端,我已經取走了消息
14     # 回覆方式在這
15     ch.basic_ack(delivery_tag=method.delivery_tag)
16 # 關閉no_ack,表明給與服務端ack回覆,確認給與回覆
17 channel.basic_consume(callback,queue='金品沒',no_ack=False)
18 
19 channel.start_consuming()
View Code

 

消息持久化

1 演示
2 1.執行生產者,向隊列寫入數據,產生一個新隊列queue
3 2.重啓服務端,隊列丟失
4 
5 3.開啓生產者數據持久化後,重啓rabbitmq,隊列不丟失
6 4.依舊能夠讀取數據

  消息的可靠性是RabbitMQ的一大特點,那麼RabbitMQ是如何保證消息可靠性的呢——消息持久化。 爲了保證RabbitMQ在退出或者crash等異常狀況下數據沒有丟失,須要將queue,exchange和Message都持久化。

生產者.py
消費者.py

  EXchange模型

rabbitmq發送消息首先是發給exchange,而後再經過exchange發送消息給隊列(queue)

exchange有四種模式

fanout

exchange將消息發送給和該exchange鏈接的全部queue;也就是所謂的廣播模式;此模式下忽略routing_key;

direct

路由模式,經過routing_key將消息發送給對應的queue; 以下面這句便可設置exchange爲direct模式,只有routing_key爲「black」時纔將其發送到隊列queue_name;channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')

 

在上圖中,Q1和Q2能夠綁定同一個key,如綁定routing_key=‘KeySame’,那麼收到routing_key爲KeySame的消息時將會同時發送給Q1和Q2,退化爲廣播模式;

top

topic模式相似於direct模式,只是其中的routing_key變成了一個有「.」分隔的字符串,「.」將字符串分割成幾個單詞,每一個單詞表明一個條件;

headers

headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

官方教程:http://www.rabbitmq.com/tutorials/tutorial-three-python.html

 

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

1 # fanout全部的隊列放一份/給某些隊列發
2 # 傳送消息的模式
3 # 與exchange有關的模式都發
4 exchange_type = fanout

能夠運行屢次,運行多個消費者,等待消息

消費者_訂閱.py
生產者_發佈者.py

實例

1.能夠運行多個消費者,至關於有多個滴滴司機,等待着Exchange同一個電臺發消息
2.運行發佈者,發送消息給Exchange,查看是否給全部的隊列(滴滴司機)發送了消息

關鍵字發佈Exchange

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

 

 1 路由關鍵字是sb,alex
 2 
 3 # -*- coding: utf-8 -*-
 4 # __author__ = "maple"
 5 import pika
 6  7 credentials = pika.PlainCredentials("root","123")
 8 connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
 9 channel = connection.channel()
10 11 # exchange='m1',exchange(祕書)的名稱
12 # exchange_type='fanout' , 祕書工做方式將消息發送給全部的隊列
13 channel.exchange_declare(exchange='m2',exchange_type='direct')
14 15 # 隨機生成一個隊列,隊列退出時,刪除這個隊列
16 result = channel.queue_declare(exclusive=True)
17 queue_name = result.method.queue
18 19 # 讓exchange和queque進行綁定,只要
20 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='alex')
21 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='sb')
22 23 24 def callback(ch, method, properties, body):
25     print("消費者接受到了任務: %r" % body)
26 27 channel.basic_consume(callback,queue=queue_name,no_ack=True)
28 29 channel.start_consuming()
30
消費者1.py
 1 路由關鍵字sb
 2 
 3 # -*- coding: utf-8 -*-
 4 # __author__ = "maple"
 5 import pika
 6  7 credentials = pika.PlainCredentials("root","123")
 8 connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
 9 channel = connection.channel()
10 11 # exchange='m1',exchange(祕書)的名稱
12 # exchange_type='fanout' , 祕書工做方式將消息發送給全部的隊列
13 channel.exchange_declare(exchange='m2',exchange_type='direct')
14 15 # 隨機生成一個隊列
16 result = channel.queue_declare(exclusive=True)
17 queue_name = result.method.queue
18 19 # 讓exchange和queque進行綁定.
20 channel.queue_bind(exchange='m2',queue=queue_name,routing_key='sb')
21 22 23 def callback(ch, method, properties, body):
24     print("消費者接受到了任務: %r" % body)
25 26 channel.basic_consume(callback,queue=queue_name,no_ack=True)
27 28 channel.start_consuming()
消費者2.py
 1 發送消息給匹配的路由,sb或者alex
 2 
 3 # -*- coding: utf-8 -*-
 4 # __author__ = "yugo"
 5  6  7 import pika
 8 credentials = pika.PlainCredentials("root","123")
 9 connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
10 channel = connection.channel()
11 12 # 路由模式的交換機會發送給綁定的key和routing_key匹配的隊列
13 channel.exchange_declare(exchange='m2',exchange_type='direct')
14 # 發送消息,給有關sb的路由關鍵字
15 channel.basic_publish(exchange='m2',
16                       routing_key='sb',
17                       body='aaaalexlaolelaodi')
18 19 connection.close()
生產者.py

RPC之遠程過程調用

將一個函數運行在遠程計算機上而且等待獲取那裏的結果,這個稱做遠程過程調用(Remote Procedure Call)或者 RPC。

RPC是一個計算機通訊協議。

比喻

將計算機服務運行理解爲廚師作飯,廚師想作一個小蔥拌豆腐,廚師須要洗小蔥、切豆腐、調汁、涼拌。他一我的完成全部的事,如同古老的集中式應用,一臺計算機作全部的事。

製做小蔥拌豆腐{
  廚師>洗小蔥>切豆腐>涼拌
}

rpc應用場景

而現在,飯店作大了,有錢了,專職分工來幹活,再也不是廚師單打獨鬥,備菜師傅準備小蔥、豆腐,切菜師傅切小蔥、豆腐,廚師只負責調味,完成食品。

製做小蔥拌豆腐{
  備菜師>洗菜
  切菜師>切菜
  廚師>調味
}

此時一件事好多人在作,廚師就得和其餘人溝通,通知備菜、洗菜師傅的這個動做就是遠程過程調用(RPC)。

這個過程在計算機系統中,一個電商的下單過程,涉及物流、支付、庫存、紅包等多個系統,多個系統又在多個服務器上,由不一樣的技術團隊負責,整個下單過程,須要全部團隊進行遠程調用。

下單{
  庫存>減小庫存
  支付>扣款
  紅包>減免紅包
  物流>生成訂單
}

什麼是rpc

rpc指的是在計算機A上的進程,調用另一臺計算機B的進程,A上的進程被掛起,B上的被調用進程開始執行後,產生返回值給A,A繼續執行。
調用方能夠經過參數將信息傳遞給被調用方,然後經過返回結果獲得信息,這個過程對於開發人員來講是透明的。

如同廚師同樣,服務員把菜單給後廚,廚師告訴洗菜人,備菜人,開始工做,完成工做後,整個過程對於服務員是透明的,他徹底不用管後廚是怎麼把菜作好的。

因爲服務在不一樣的機器上,遠程調用必經網絡通訊,調用服務必須寫一坨網絡通訊代碼,很容易出錯且很複雜,所以就出現了RPC框架。

阿里巴巴的 Dubbo  java
新浪的 Motan java
谷歌的 gRPC 多語言
Apache thrift 多語言

rpc封裝了數據的序列化,反序列化,以及傳輸協議

python實現RPC

  利用RabbitMQ構建一個RPC系統,包含了客戶端和RPC服務器,依舊使用pika模塊

Callback queue回調隊列

  一個客戶端向服務器發送請求,服務器端處理請求後,將其處理結果保存在一個存儲體中。而客戶端爲了得到處理結果,那麼客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to

Correlation id 關聯標識

  一個客戶端可能會發送多個請求給服務器,當服務器處理完後,客戶端沒法辨別在回調隊列中的響應具體和那個請求時對應的。爲了處理這種狀況,客戶端在發送每一個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就能夠分辨此響應屬於哪一個請求。

客戶端發送請求:某個應用將請求信息交給客戶端,而後客戶端發送RPC請求,在發送RPC請求到RPC請求隊列時,客戶端至少發送帶有reply_to以及correlation_id兩個屬性的信息

服務器端工做流: 等待接受客戶端發來RPC請求,當請求出現的時候,服務器從RPC請求隊列中取出請求,而後處理後,將響應發送到reply_to指定的回調隊列中

客戶端接受處理結果: 客戶端等待回調隊列中出現響應,當響應出現時,它會根據響應中correlation_id字段的值,將其返回給對應的應用

過程
1.啓動rpc客戶端,等待接收數據到來,來了以後就進行處理,再將結果丟進隊列
2.啓動rpc服務端,發起請求
rpc_server.py
 1 import pika
 2 # 創建鏈接,服務器地址爲localhost,可指定ip地址
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(
 4     host='192.168.119.10'))
 5 # 創建會話
 6 channel = connection.channel()
 7 # 聲明RPC請求隊列
 8 channel.queue_declare(queue='s14rpc')
 9 
10 # 模擬一個進程,例如切菜師傅,等着洗菜師傅傳遞數據
11 def sum(n):
12     n+=100
13     return n
14 # 對RPC請求隊列中的請求進行處理
15 
16 
17 def on_request(ch, method, props, body):
18     print(body,type(body))
19     n = int(body)
20     print(" 正在處理sum(%s)" % n)
21     # 調用數據處理方法
22     response = sum(n)
23     # 將處理結果(響應)發送到回調隊列
24     ch.basic_publish(exchange='',
25                      # reply_to表明回覆目標
26                      routing_key=props.reply_to,
27                      # correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。
28                      properties=pika.BasicProperties(correlation_id= \
29                                                          props.correlation_id),
30                      body=str(response))
31     ch.basic_ack(delivery_tag=method.delivery_tag)
32 
33 # 負載均衡,同一時刻發送給該服務器的請求不超過一個
34 channel.basic_qos(prefetch_count=1)
35 channel.basic_consume(on_request, queue='s14rpc')
36 print("等待接收rpc請求")
37 
38 
39 #開始消費
40 channel.start_consuming()
rpc_client.py
相關文章
相關標籤/搜索