消息隊列rabbitmq/kafka

12.1 rabbitMQ

1. 你瞭解的消息隊列

rabbitmq是一個消息代理,它接收和轉發消息,能夠理解爲是生活的郵局。
你能夠將郵件放在郵箱裏,你能夠肯定有郵遞員會發送郵件給收件人。
歸納:
rabbitmq是接收,存儲,轉發數據的。
官方教程:http://www.rabbitmq.com/tutorials/tutorial-one-python.html

消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。html

消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。前端

2. 公司在什麼狀況下會用消息隊列?

1.電商訂單

想必同窗們都點過外賣,點擊下單後的業務邏輯可能包括:檢查庫存、生成單據、發紅包、短信通知等,若是這些業務同步執行,完成下單率會很是低,如發紅包,短信通知等沒必要要的流程,異步執行便可。java

此時使用MQ,能夠在覈心流程(扣減庫存、生成訂單記錄)等完成後發送消息到MQ,快速結束本次流程。消費者拉取MQ消息時,發現紅包、短信等消息時,再進行處理。python

場景:雙11是購物狂節,用戶下單後,訂單系統須要通知庫存系統,傳統的作法就是訂單系統調用庫存系統的接口centos

 

這種作法有一個缺點:緩存

  • 當庫存系統出現故障時,訂單就會失敗。(這樣馬雲將少賺好多好多錢錢。。。。)服務器

  • 訂單系統和庫存系統高耦合. 網絡

引入消息隊列app

 

  • 訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。負載均衡

  • 庫存系統:訂閱下單的消息,獲取下單消息,進行庫操做。 就算庫存系統出現故障,消息隊列也能保證消息的可靠投遞,不會致使消息丟失(馬雲這下高興了,鈔票快快的來呀~~).

2.秒殺活動

流量削峯通常在秒殺活動中應用普遍 場景:秒殺活動,通常會由於流量過大,致使應用掛掉,爲了解決這個問題,通常在應用前端加入消息隊列。 做用: 1.能夠控制活動人數,超過此必定閥值的訂單直接丟棄(怪不得我一次秒殺都沒搶到過。。。。。wtf???)

2.能夠緩解短期的高流量壓垮應用(應用程序按本身的最大處理能力獲取訂單)

 

 

3.用戶的請求,服務器接收到以後,寫入消息隊列,超過定義的閾值就直接丟棄請求,或者跳轉錯誤頁面。

4.業務系統取出隊列中的消息,再作後續處理。

 

3. rabbitMQ安裝

rabbitmq-server服務端

1.下載centos源
wget -O /etc/yum.repos.d/CentOS-Base.repo   http://mirrors.cloud.tencent.com/repo/centos7_base.repo
2.下載epel源
wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo
3.清空yum緩存而且生成新的yum緩存
yum clean all
yum makecache
4.安裝erlang
  $ yum -y install erlang
5.安裝RabbitMQ
  $ yum -y install rabbitmq-server
6.啓動(無用戶名密碼):
  systemctl start/stop/restart/status rabbitmq-server

設置rabbitmq帳號密碼,以及角色權限設置

# 設置新用戶yugo 密碼123
sudo rabbitmqctl add_user yugo 123

# 設置用戶爲administrator角色
sudo rabbitmqctl set_user_tags yugo administrator

# 設置權限,容許對全部的隊列都有權限
sudo rabbitmqctl set_permissions -p "/" yugo ".*" ".*" ".*"

#重啓服務生效設置
service rabbitmq-server start/stop/restart

rabbitmq相關命令

// 新建用戶
rabbitmqctl add_user {用戶名} {密碼}

// 設置權限
rabbitmqctl set_user_tags {用戶名} {權限}

// 查看用戶列表
rabbitmqctl list_users

// 爲用戶受權
添加 Virtual Hosts :    
rabbitmqctl add_vhost <vhost>    

// 刪除用戶
rabbitmqctl delete_user Username

// 修改用戶的密碼
rabbitmqctl change_password Username Newpassword
   
// 刪除 Virtual Hosts :    
rabbitmqctl delete_vhost <vhost>    
   
// 添加 Users :    
rabbitmqctl add_user <username> <password>    
rabbitmqctl set_user_tags <username> <tag> ...    
rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read>    
   
// 刪除 Users :    
delete_user <username>  

// 使用戶user1具備vhost1這個virtual host中全部資源的配置、寫、讀權限以便管理其中的資源
rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*'

// 查看權限
rabbitmqctl list_user_permissions user1

rabbitmqctl list_permissions -p vhost1

// 清除權限
rabbitmqctl clear_permissions [-p VHostPath] User

//清空隊列步驟
rabbitmqctl reset
須要提早關閉應用rabbitmqctl stop_app ,
而後再清空隊列,啓動應用
rabbitmqctl start_app
此時查看隊列rabbitmqctl list_queues

查看全部的exchange: rabbitmqctl list_exchanges
查看全部的queue: rabbitmqctl list_queues
查看全部的用戶:   rabbitmqctl list_users
查看全部的綁定(exchange和queue的綁定信息): rabbitmqctl list_bindings
查看消息確認信息:
rabbitmqctl list_queues name messages_ready messages_unacknowledged
查看RabbitMQ狀態,包括版本號等信息:rabbitmqctl status

鏈接客戶端

// rabbitmq官方推薦的python客戶端pika模塊
pip3 install pika

生產-消費者模型

P 是生產者
C 是消費者
中間hello是消息隊列
能夠有多個P、多個C

P發送消息給hello隊列,C消費者從隊列中獲取消息,默認輪詢方式

 

 

生產者send.py

咱們的第一個程序send.py將向隊列發送一條消息。咱們須要作的第一件事是創建與RabbitMQ服務器的鏈接。

#!/usr/bin/env python
import pika
# 建立憑證,使用rabbitmq用戶密碼登陸
credentials = pika.PlainCredentials("root","123")
# 新建鏈接,這裏localhost能夠更換爲服務器ip
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
# 建立頻道
channel = connection.channel()
# 新建一個hello隊列,用於接收消息
channel.queue_declare(queue='oldboypython')
# 注意在rabbitmq中,消息想要發送給隊列,必須通過交換(exchange),初學可使用空字符串交換(exchange=''),它容許咱們精確的指定發送給哪一個隊列(routing_key=''),參數body值發送的數據
channel.basic_publish(exchange='',
                    routing_key='oldboypython',
                    body='msg6')
print("已經發送了消息")
# 程序退出前,確保刷新網絡緩衝以及消息發送給rabbitmq,須要關閉本次鏈接
connection.close()

 

 

能夠同時存在多個接受者,等待接收隊列的消息,默認是輪訓方式分配消息

接受者receive.py,能夠運行屢次,運行多個消費者

import pika
# 創建與rabbitmq的鏈接
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()


channel.queue_declare(queue="oldboypython")

def callbak(ch,method,properties,body):
  print("消費者接收到了任務:%r"%body)
# 有消息來臨,當即執行callbak,沒有消息則夯住,等待消息
channel.basic_consume(callbak,queue="oldboypython",no_ack=True)
# 開始消費,接收消息
channel.start_consuming()    

練習:

分別啓動生產者、兩個消費者,往隊列發送數據,查看消費者的結果

rabbitmq消息確認之ack

官網資料:http://www.rabbitmq.com/tutorials/tutorial-two-python.html

默認狀況下,生產者發送數據給隊列,消費者取出消息後,數據將被清除。

特殊狀況,若是消費者處理過程當中,出現錯誤,數據處理沒有完成,那麼這段數據將從隊列丟失

no-ack機制

不確認機制也就是說每次消費者接收到數據後,無論是否處理完畢,rabbitmq-server都會把這個消息標記完成,從隊列中刪除

ACK機制

ACK機制用於保證消費者若是拿了隊列的消息,客戶端處理時出錯了,那麼隊列中仍然還存在這個消息,提供下一位消費者繼續取

生產者.py 只負責發送數據便可

import pika
# 無密碼
# connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61'))

# 有密碼
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()
# 聲明一個隊列(建立一個隊列)
channel.queue_declare(queue='s13q2')

channel.basic_publish(exchange='',
                    routing_key='s13q2', # 關鍵字查找到隊列名
                    body='msg8')
connection.close()

消費者.py給與ack回覆

拿到消息必須給rabbitmq服務端回覆ack信息,不然消息不會被刪除,防止客戶端出錯,數據丟失

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# 聲明一個隊列(建立一個隊列)
channel.queue_declare(queue='s13q2')

def callback(ch, method, properties, body):
  print("消費者接受到了任務: %r" % body)
  # int('asdfasdf')
  # 我告訴rabbitmq服務端,我已經取走了消息
  ch.basic_ack(delivery_tag=method.delivery_tag)
# 關閉no_ack,給與服務端ack回覆
channel.basic_consume(callback,queue='s13q2',no_ack=False)

channel.start_consuming()

消息持久化

消息的可靠性是RabbitMQ的一大特點,那麼RabbitMQ是如何保證消息可靠性的呢——消息持久化。 爲了保證RabbitMQ在退出或者crash等異常狀況下數據沒有丟失,須要將queue,exchange和Message都持久化。

生產者.py

import pika
# 無密碼
# connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61'))

# 有密碼
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()
# 聲明一個隊列(建立一個隊列)
# 默認此隊列不支持持久化,若是服務掛掉,數據丟失
# durable=True 開啓持久化,必須新開啓一個隊列,本來的隊列已經不支持持久化了
channel.queue_declare(queue='music',durable=True)

channel.basic_publish(exchange='',
                    routing_key='music', # 消息隊列名稱
                    body='haohaio4',
                    # 支持數據持久化
                    properties=pika.BasicProperties(
                        delivery_mode=2,
                    )
                    )
connection.close()

消費者.py

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# 聲明一個隊列(建立一個隊列)
channel.queue_declare(queue='music',durable=True)

def callback(ch, method, properties, body):
  print("消費者接受到了任務: %r" % body)
  # 模擬代碼報錯
  # int('asdfasdf')

  # 此處報錯,沒有給予回覆,保證客戶端掛掉,數據不丟失
  # 告訴服務端,我已經取走了數據
  # ch.basic_ack(delivery_tag=method.delivery_tag)

# 關閉no_ack,表明給與回覆確認
channel.basic_consume(callback,queue='music',no_ack=False)

channel.start_consuming()

Exchange模型

rabbitmq發送消息首先是發給exchange,而後再經過exchange發送消息給隊列(queue)

exchange有四種模式

fanout

exchange將消息發送給和該exchange鏈接的全部queue;也就是所謂的廣播模式;此模式下忽略routing_key;

direct

路由模式,經過routing_key將消息發送給對應的queue; 以下面這句便可設置exchange爲direct模式,只有routing_key爲「black」時纔將其發送到隊列queue_name; channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')

 

在上圖中,Q1和Q2能夠綁定同一個key,如綁定routing_key=‘KeySame’,那麼收到routing_key爲KeySame的消息時將會同時發送給Q1和Q2,退化爲廣播模式;

top

topic模式相似於direct模式,只是其中的routing_key變成了一個有「.」分隔的字符串,「.」將字符串分割成幾個單詞,每一個單詞表明一個條件;

headers

headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

官方教程:http://www.rabbitmq.com/tutorials/tutorial-three-python.html

 

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

# fanout全部的隊列放一份/給某些隊列發
# 傳送消息的模式
# 與exchange有關的模式都發
exchange_type = fanout

消費者_訂閱.py

能夠運行屢次,運行多個消費者,等待消息

import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(祕書)的名稱
# exchange_type='fanout' , 祕書工做方式將消息發送給全部的隊列
channel.exchange_declare(exchange='m1',exchange_type='fanout')

# 隨機生成一個隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='m1',queue=queue_name)


def callback(ch, method, properties, body):
  print("消費者接受到了任務: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

生產者_發佈者.py

# -*- coding: utf-8 -*-
# __author__ = "yugo"


import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# 指定exchange
channel.exchange_declare(exchange='m1',exchange_type='fanout')

channel.basic_publish(exchange='m1',
                    routing_key='',# 這裏再也不指定隊列,由exchange分配,若是是fanout模式,每個隊列放一份
                    body='haohaio')

connection.close()

實例

1.能夠運行多個消費者,至關於有多個滴滴司機,等待着Exchange同一個電臺發消息
2.運行發佈者,發送消息給Exchange,查看是否給全部的隊列(滴滴司機)發送了消息

關鍵字發佈Exchange

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

 

消費者1.py

路由關鍵字是sb,alex

# -*- coding: utf-8 -*-
# __author__ = "maple"
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(祕書)的名稱
# exchange_type='fanout' , 祕書工做方式將消息發送給全部的隊列
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 隨機生成一個隊列,隊列退出時,刪除這個隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 讓exchange和queque進行綁定,只要
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='alex')
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='sb')


def callback(ch, method, properties, body):
  print("消費者接受到了任務: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()


消費者2.py

路由關鍵字sb

# -*- coding: utf-8 -*-
# __author__ = "maple"
import pika

credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# exchange='m1',exchange(祕書)的名稱
# exchange_type='fanout' , 祕書工做方式將消息發送給全部的隊列
channel.exchange_declare(exchange='m2',exchange_type='direct')

# 隨機生成一個隊列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 讓exchange和queque進行綁定.
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='sb')


def callback(ch, method, properties, body):
  print("消費者接受到了任務: %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

生產者.py

發送消息給匹配的路由,sb或者alex

# -*- coding: utf-8 -*-
# __author__ = "yugo"


import pika
credentials = pika.PlainCredentials("root","123")
connection = pika.BlockingConnection(pika.ConnectionParameters('123.206.16.61',credentials=credentials))
channel = connection.channel()

# 路由模式的交換機會發送給綁定的key和routing_key匹配的隊列
channel.exchange_declare(exchange='m2',exchange_type='direct')
# 發送消息,給有關sb的路由關鍵字
channel.basic_publish(exchange='m2',
                    routing_key='sb',
                    body='aaaalexlaolelaodi')

connection.close()

RPC之遠程過程調用

將一個函數運行在遠程計算機上而且等待獲取那裏的結果,這個稱做遠程過程調用(Remote Procedure Call)或者 RPC。RPC是一個計算機通訊協議。

比喻

將計算機服務運行理解爲廚師作飯,廚師想作一個小蔥拌豆腐,廚師須要洗小蔥、切豆腐、調汁、涼拌。他一我的完成全部的事,如同古老的集中式應用,一臺計算機作全部的事。

製做小蔥拌豆腐{
  廚師>洗小蔥>切豆腐>涼拌
}

rpc應用場景

而現在,飯店作大了,有錢了,專職分工來幹活,再也不是廚師單打獨鬥,備菜師傅準備小蔥、豆腐,切菜師傅切小蔥、豆腐,廚師只負責調味,完成食品。

製做小蔥拌豆腐{
  備菜師>洗菜
  切菜師>切菜
  廚師>調味
}

此時一件事好多人在作,廚師就得和其餘人溝通,通知備菜、洗菜師傅的這個動做就是遠程過程調用(RPC)。

這個過程在計算機系統中,一個電商的下單過程,涉及物流、支付、庫存、紅包等多個系統,多個系統又在多個服務器上,由不一樣的技術團隊負責,整個下單過程,須要全部團隊進行遠程調用。

下單{
  庫存>減小庫存
  支付>扣款
  紅包>減免紅包
  物流>生成訂單
}

到底什麼是rpc

rpc指的是在計算機A上的進程,調用另一臺計算機B的進程,A上的進程被掛起,B上的被調用進程開始執行後,產生返回值給A,A繼續執行。
調用方能夠經過參數將信息傳遞給被調用方,然後經過返回結果獲得信息,這個過程對於開發人員來講是透明的。

如同廚師同樣,服務員把菜單給後廚,廚師告訴洗菜人,備菜人,開始工做,完成工做後,整個過程對於服務員是透明的,他徹底不用管後廚是怎麼把菜作好的。

因爲服務在不一樣的機器上,遠程調用必經網絡通訊,調用服務必須寫一坨網絡通訊代碼,很容易出錯且很複雜,所以就出現了RPC框架。

阿里巴巴的 Dubbo  java
新浪的 Motan java
谷歌的 gRPC 多語言
Apache thrift 多語言

python實現RPC

利用RabbitMQ構建一個RPC系統,包含了客戶端和RPC服務器,依舊使用pika模塊

Callback queue 回調隊列

一個客戶端向服務器發送請求,服務器端處理請求後,將其處理結果保存在一個存儲體中。而客戶端爲了得到處理結果,那麼客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to

Correlation id 關聯標識

一個客戶端可能會發送多個請求給服務器,當服務器處理完後,客戶端沒法辨別在回調隊列中的響應具體和那個請求時對應的。爲了處理這種狀況,客戶端在發送每一個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就能夠分辨此響應屬於哪一個請求。

客戶端發送請求:某個應用將請求信息交給客戶端,而後客戶端發送RPC請求,在發送RPC請求到RPC請求隊列時,客戶端至少發送帶有reply_to以及correlation_id兩個屬性的信息

服務器端工做流: 等待接受客戶端發來RPC請求,當請求出現的時候,服務器從RPC請求隊列中取出請求,而後處理後,將響應發送到reply_to指定的回調隊列中

客戶端接受處理結果: 客戶端等待回調隊列中出現響應,當響應出現時,它會根據響應中correlation_id字段的值,將其返回給對應的應用

rpc_server.py

import pika
import uuid

class FibonacciRpcClient(object):
  def __init__(self):

      # 客戶端啓動時,建立回調隊列,會開啓會話用於發送RPC請求以及接受響應

      # 創建鏈接,指定服務器的ip地址
      self.connection = pika.BlockingConnection(pika.ConnectionParameters(
          host='123.206.16.61'))

      # 創建一個會話,每一個channel表明一個會話任務
      self.channel = self.connection.channel()

      # 聲明回調隊列,再次聲明的緣由是,服務器和客戶端可能前後開啓,該聲明是冪等的,屢次聲明,但只生效一次
      result = self.channel.queue_declare(exclusive=True)
      # 將次隊列指定爲當前客戶端的回調隊列
      self.callback_queue = result.method.queue

      # 客戶端訂閱回調隊列,當回調隊列中有響應時,調用`on_response`方法對響應進行處理;
      self.channel.basic_consume(self.on_response, no_ack=True,
                                  queue=self.callback_queue)

  # 對回調隊列中的響應進行處理的函數
  def on_response(self, ch, method, props, body):
      if self.corr_id == props.correlation_id:
          self.response = body

  # 發出RPC請求
  def call(self, n):

      # 初始化 response
      self.response = None

      # 生成correlation_id 關聯標識
      self.corr_id = str(uuid.uuid4())

      # 發送RPC請求內容到RPC請求隊列`rpc_queue`,同時發送的還有`reply_to`和`correlation_id`
      self.channel.basic_publish(exchange='',
                                  routing_key='rpc_queue',
                                  properties=pika.BasicProperties(
                                      reply_to=self.callback_queue,
                                      correlation_id=self.corr_id,
                                  ),
                                  body=str(n))

      while self.response is None:
          self.connection.process_data_events()
      return int(self.response)


# 創建客戶端
fibonacci_rpc = FibonacciRpcClient()

# 發送RPC請求
print(" [x] Requesting sum(30)")
response = fibonacci_rpc.call(40)
print(" [.] Got %r" % response)


rpc_client.py

import pika

# 創建鏈接,服務器地址爲localhost,可指定ip地址
connection = pika.BlockingConnection(pika.ConnectionParameters(
  host='123.206.16.61'))

# 創建會話
channel = connection.channel()

# 聲明RPC請求隊列
channel.queue_declare(queue='rpc_queue')


# 數據處理方法
def sum(n):
  n+=100
  return n



# 對RPC請求隊列中的請求進行處理
def on_request(ch, method, props, body):
  n = int(body)

  print(" [.] sum(%s)" % n)

  # 調用數據處理方法
  response = sum(n)

  # 將處理結果(響應)發送到回調隊列
  ch.basic_publish(exchange='',
                    # reply_to表明回覆目標
                    routing_key=props.reply_to,
                    # correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。
                    properties=pika.BasicProperties(correlation_id= \
                                                        props.correlation_id),
                    body=str(response))
  ch.basic_ack(delivery_tag=method.delivery_tag)


# 負載均衡,同一時刻發送給該服務器的請求不超過一個
channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()
相關文章
相關標籤/搜索