RabbitMQ實踐

一 概述

1.1 背景

近期在作告警集成平臺,其中須要告警消息發送,類型須要涵蓋目前市場主流的消息接受端,例如微信/企業微信/釘釘/郵件/短信/電話等等,這勢必要利用到MQ,在衆多的消息中間件中,通過調研此場景並不象大數據處理場景須要kafka,同時須要較高性能和確認機制,數據的可靠性和活躍的社區,支持消息的持久化於中間件的高可用部署,最終選型了RabbitMQ來做爲應用的中間件。html

1.2 概念

MQ全稱爲Message Queue, 即消息隊列。MQ是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。RabbitMQ則是一個在AMQP基礎上完整的,可複用的企業消息系統。node

1.3 功能

  • 應用解耦:mq基於數據的接口層,將耦合的應用來分解開,兩邊都實現這個接口,這樣就容許獨立的修改或者擴展兩邊的處理過程,只要兩邊遵照相同的接口約束便可。
  • 流量削峯:在高併發,大流量的場景下,rabbitmq能夠減小突發訪問壓力,不會由於突發的超時負荷要求而崩潰
  • 異步通訊:經過把把消息發送給消息中間件,將不是實時的業務異步處理

1.4 特色

  • 可靠性:RabbitMQ使用一些機制來保證可靠性,如持久化、傳輸確認及發佈確認等。
  • 靈活的路由:在消息進入隊列以前,經過交換器來路由消息。對於典型的路由功能,RabbitMQ己經提供了一些內置的交換器來實現。針對更復雜的路由功能,能夠將多個交換器綁定在一塊兒,也能夠經過插件機制來實現本身的交換器。
  • 擴展性:多個RabbitMQ節點能夠組成一個集羣,也能夠根據實際業務狀況動態地擴展集羣中節點。
  • 高可用性:隊列能夠在集羣中的機器上設置鏡像,使得在部分節點出現問題的狀況下隊仍然可用。
  • 多種協議:RabbitMQ除了原生支持AMQP協議,還支持STOMP,MQTT等多種消息中間件協議。
  • 多語言客戶端:RabbitMQ幾乎支持全部經常使用語言,好比Jav a、Python、Ruby、PHP、C#、JavaScript等。
  • 管理界面:RabbitMQ提供了一個易用的用戶界面,使得用戶能夠監控和管理消息、集羣中的節點等。
  • 插件機制:RabbitMQ提供了許多插件,以實現從多方面進行擴展,固然也能夠編寫本身的插件。

二 架構

2.1 架構圖

圖片描述
圖片描述
圖片描述

  • RabbitMQ Server

也叫Broker Server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQ isn't a food truck, it's a delivery service. 它的角色就是維護一條從Producer到Consumer的路線,保證數據可以按照指定的方式進行傳輸。雖然這個保證也不是100%的保證,可是對於普通的應用來講這已經足夠了。固然對於商業系統來講,能夠再作一層數據一致性的guard,就能夠完全保證系統的一致性了。python

  • Client P

也叫Producer,數據的發送方。Create messages and publish (send) them to a Broker Server (RabbitMQ)。一個Message有兩個部分:payload(有效載荷)和label(標籤)。payload顧名思義就是傳輸的數據。label是exchange的名字或者說是一個tag,它描述了payload,並且RabbitMQ也是經過這個label來決定把這個Message發給哪一個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。git

  • Client C

也叫Consumer,數據的接收方。Consumers attach to a Broker Server (RabbitMQ) and subscribe to a queue。把queue比做是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它發送給它的某個訂閱者即Consumer。固然可能會把同一個Message發送給不少的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來講,它是不知道誰發送的這個信息的,就是協議自己不支持。固然了,若是Producer發送的payload包含了Producer的信息就另當別論了。github

  • Connection

就是一個TCP的鏈接。Producer和Consumer都是經過TCP鏈接到RabbitMQ Server的。之後咱們能夠看到,程序的起始處就是創建這個TCP鏈接。算法

  • Channel

虛擬鏈接。它創建在上述的TCP鏈接中。數據流動都是在Channel中進行的。也就是說,通常狀況是程序起始創建TCP鏈接,第二步就是創建這個Channel。shell

那麼,爲何使用Channel,而不是直接使用TCP鏈接?vim

對於OS來講,創建和關閉TCP鏈接是有代價的,頻繁的創建關閉TCP鏈接對於系統的性能有很大的影響,並且TCP的鏈接數也有限制,這也限制了系統處理高併發的能力。可是,在TCP鏈接中創建Channel是沒有上述代價的。對於Producer或者Consumer來講,能夠併發的使用多個Channel進行Publish或者Receive。有實驗代表,1s的數據能夠Publish10K的數據包。固然對於不一樣的硬件環境,不一樣的數據包大小這個數據確定不同,可是我只想說明,對於普通的Consumer或者Producer來講,這已經足夠了。若是不夠用,你考慮的應該是如何細化SPLIT你的設計。centos

2.2 相關定義

  • Broker: 簡單來講就是消息隊列服務器實體
  • Exchange: 消息交換機,它指定消息按什麼規則,路由到哪一個隊列
  • Queue: 消息隊列載體,每一個消息都會被投入到一個或多個隊列
  • Binding: 綁定,它的做用就是把exchange和queue按照路由規則綁定起來
  • Routing Key: 路由關鍵字,exchange根據這個關鍵字進行消息投遞
  • VHost: 虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
  • Producer: 消息生產者,就是投遞消息的程序
  • Consumer: 消息消費者,就是接受消息的程序
  • Channel: 消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務

由Exchange、Queue、RoutingKey三個才能決定一個從Exchange到Queue的惟一的線路。安全

2.3 基本概念

Connection Factory、Connection、Channel都是RabbitMQ對外提供的API中最基本的對象。Connection是RabbitMQ的socket連接,它封裝了socket協議相關部分邏輯。Connection Factory則是Connection的製造工廠。

Channel是咱們與RabbitMQ打交道的最重要的一個接口,咱們大部分的業務操做是在Channel這個接口中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發佈消息等。

  • Queue

Queue(隊列)是RabbitMQ的內部對象,用於存儲消息,以下圖表示。

圖片描述

RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)能夠從Queue中獲取消息並消費。

圖片描述
多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。
圖片描述

  • Message acknowledgment

在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其餘意外)的狀況,這種狀況下就可能會致使消息丟失。爲了不這種狀況發生,咱們能夠要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後纔將該消息從Queue中移除。

若是RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ鏈接斷開,則RabbitMQ會將該消息發送給其餘消費者(若是存在多個消費者)進行處理。這裏不存在timeout,一個消費者處理消息時間再長也不會致使該消息被髮送給其餘消費者,除非它的RabbitMQ鏈接斷開。

這裏會產生另一個問題,若是咱們的開發人員在處理完業務邏輯後,忘記發送回執給RabbitMQ,這將會致使嚴重的bug——Queue中堆積的消息會愈來愈多。消費者重啓後會重複消費這些消息並重復執行業務邏輯。

另外publish message 是沒有ACK的。

  • Message durability

若是咱們但願即便在RabbitMQ服務重啓的狀況下,也不會丟失消息,咱們能夠將Queue與Message都設置爲可持久化的(durable),這樣能夠保證絕大部分狀況下咱們的RabbitMQ消息不會丟失。但依然解決不了小几率丟失事件的發生(好比RabbitMQ服務器已經接收到生產者的消息,但還沒來得及持久化該消息時RabbitMQ服務器就斷電了),若是咱們須要對這種小几率事件也要管理起來,那麼咱們要用到事務。因爲這裏僅爲RabbitMQ的簡單介紹,因此這裏將不講解RabbitMQ相關的事務。

  • Prefetch count

前面咱們講到若是有多個消費者同時訂閱同一個Queue中的消息,Queue中的消息會被平攤給多個消費者。這時若是每一個消息的處理時間不一樣,就有可能會致使某些消費者一直在忙,而另一些消費者很快就處理完手頭工做並一直空閒的狀況。咱們能夠經過設置Prefetch count來限制Queue每次發送給每一個消費者的消息數,好比咱們設置prefetchCount=1,則Queue每次給每一個消費者發送一條消息;消費者處理完這條消息後Queue會再給該消費者發送一條消息。

圖片描述

  • Exchange

在上一節咱們看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的狀況是,生產者將消息發送到Exchange(交換器,下圖中的X),由Exchange將消息路由到一個或多個Queue中(或者丟棄)。

Exchange是按照什麼邏輯將消息路由到Queue的?這個將在Binding一節中介紹。

RabbitMQ中的Exchange有四種類型,不一樣的類型有着不一樣的路由策略,這將在Exchange Types一節介紹。

  • Routing Key

生產者在將消息發送給Exchange的時候,通常會指定一個Routing Key,來指定這個消息的路由規則,而這個Routing Key須要與Exchange Type及Binding key聯合使用才能最終生效。

在Exchange Type與Binding key固定的狀況下(在正常使用時通常這些內容都是固定配置好的),咱們的生產者就能夠在發送消息給Exchange時,經過指定Routing Key來決定消息流向哪裏。

RabbitMQ爲Routing Key設定的長度限制爲255 bytes。

  • Binding

RabbitMQ中經過Binding將Exchange與Queue關聯起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。

圖片描述

  • Binding key

在綁定(Binding)Exchange與Queue的同時,通常會指定一個Binding key。消費者將消息發送給Exchange時,通常會指定一個Routing Key。當 Binding key與Routing Key相匹配時,消息將會被路由到對應的Queue中。這個將在Exchange Types章節會列舉實際的例子加以說明。

在綁定多個Queue到同一個Exchange的時候,這些Binding容許使用相同的Binding key。

Binding key並非在全部狀況下都生效,它依賴於Exchange Type,好比fanout類型的Exchange就會無視Binding key,而是將消息路由到全部綁定到該Exchange的Queue。

  • Exchange Types

RabbitMQ經常使用的Exchange Type有fanout、direct、topic、headers這四種(AMQP規範裏還提到兩種Exchange Type,分別爲system與自定義,這裏不予以描述),下面分別進行介紹。

  • fanout

fanout類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中。

圖片描述

上圖中,生產者(P)發送到Exchange(X)的全部消息都會路由到圖中的兩個Queue,並最終被兩個消費者(C1與C2)消費。

  • direct

圖片描述

direct類型的Exchange路由規則也很簡單,它會把消息路由到那些Binding key與Routing key徹底匹配的Queue中。

圖片描述

以上圖的配置爲例,咱們以routingKey="error"發送消息到Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…);若是咱們以Routing Key="info"或routingKey="warning"來發送消息,則消息只會路由到Queue2。若是咱們以其餘Routing Key發送消息,則消息不會路由到這兩個Queue中。

  • topic

前面講到direct類型的Exchange路由規則是徹底匹配Binding Key與Routing Key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage類似,也是將消息路由到Binding Key與Routing Key相匹配的Queue中,但這裏的匹配規則有些不一樣,它約定:

Routing Key爲一個句點號「.」分隔的字符串(咱們將被句點號". "分隔開的每一段獨立的字符串稱爲一個單詞),如"stock.usd.nyse"、"nyse.vmw"、"quick.orange.rabbit"。Binding Key與Routing Key同樣也是句點號「. 」分隔的字符串。

Binding Key中能夠存在兩種特殊字符""與"#",用於作模糊匹配,其中""用於匹配一個單詞,"#"用於匹配多個單詞(能夠是零個)。

圖片描述
以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey。

  • headers

headers類型的Exchange不依賴於Routing Key與Binding Key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

在綁定Queue與Exchange時指定一組鍵值對;當消息發送到Exchange時,RabbitMQ會取到該消息的headers(也是一個鍵值對的形式),對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對。若是徹底匹配則消息會路由到該Queue,不然不會路由到該Queue。

該類型的Exchange沒有用到過(不過也應該頗有用武之地),因此不作介紹。

  • RPC

MQ自己是基於異步的消息處理,前面的示例中全部的生產者(P)將消息發送到RabbitMQ後不會知道消費者(C)處理成功或者失敗(甚至連有沒有消費者來處理這條消息都不知道)。

但實際的應用場景中,咱們極可能須要一些同步處理,須要同步等待服務端將個人消息處理完成後再進行下一步處理。這至關於RPC(Remote Procedure Call,遠程過程調用)。在RabbitMQ中也支持RPC。

圖片描述

  • RabbitMQ中實現RPC的機制是:

客戶端發送請求(消息)時,在消息的屬性(Message Properties,在AMQP協議中定義了14種properties,這些屬性會隨着消息一塊兒發送)中設置兩個值replyTo(一個Queue名稱,用於告訴服務器處理完成後將通知個人消息發送到這個Queue中)和correlationId(這次請求的標識號,服務器處理完成後須要將此屬性返還,客戶端將根據這個id瞭解哪條請求被成功執行了或執行失敗)。服務器端收到消息處理完後,將生成一條應答消息到replyTo指定的Queue,同時帶上correlationId屬性。客戶端以前已訂閱replyTo指定的Queue,從中收到服務器的應答消息後,根據其中的correlationId屬性分析哪條請求被執行了,根據執行結果進行後續業務處理。

2.4 細節闡明

2.4.1 使用ACK確認Message的正確傳遞

默認狀況下,若是Message 已經被某個Consumer正確的接收到了,那麼該Message就會被從Queue中移除。固然也可讓同一個Message發送到不少的Consumer。

若是一個Queue沒被任何的Consumer Subscribe(訂閱),當有數據到達時,這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被當即發送到這個Consumer。這個數據被Consumer正確收到時,這個數據就被從Queue中刪除。

那麼什麼是正確收到呢?經過ACK。每一個Message都要被acknowledged(確認,ACK)。咱們能夠顯示的在程序中去ACK,也能夠自動的ACK。若是有數據沒有被ACK,那麼RabbitMQ Server會把這個信息發送到下一個Consumer。

若是這個APP有bug,忘記了ACK,那麼RabbitMQ Server不會再發送數據給它,由於Server認爲這個Consumer處理能力有限。並且ACK的機制能夠起到限流的做用(Benefitto throttling):在Consumer處理完成數據後發送ACK,甚至在額外的延時後發送ACK,將有效的balance Consumer的load。

固然對於實際的例子,好比咱們可能會對某些數據進行merge,好比merge 4s內的數據,而後sleep 4s後再獲取數據。特別是在監聽系統的state,咱們不但願全部的state實時的傳遞上去,而是但願有必定的延時。這樣能夠減小某些IO,並且終端用戶也不會感受到。

2.4.2 Reject a message

有兩種方式,第一種的Reject可讓RabbitMQ Server將該Message 發送到下一個Consumer。第二種是從Queue中當即刪除該Message。

2.4.3 Creating a queue

Consumer和Procuder均可以經過 queue.declare 建立queue。對於某個Channel來講,Consumer不能declare一個queue,卻訂閱其餘的queue。固然也能夠建立私有的queue。這樣只有APP自己纔可使用這個queue。queue也能夠自動刪除,被標爲auto-delete的queue在最後一個Consumer unsubscribe後就會被自動刪除。那麼若是是建立一個已經存在的queue呢?那麼不會有任何的影響。須要注意的是沒有任何的影響,也就是說第二次建立若是參數和第一次不同,那麼該操做雖然成功,可是queue的屬性並不會被修改。

那麼誰應該負責建立這個queue呢?是Consumer,仍是Producer?

若是queue不存在,固然Consumer不會獲得任何的Message。那麼Producer Publish的Message會被丟棄。因此,仍是爲了數據不丟失,Consumer和Producer都try to create the queue!反正無論怎麼樣,這個接口都不會出問題。

queue對load balance的處理是完美的。對於多個Consumer來講,RabbitMQ 使用循環的方式(round-robin)的方式均衡的發送給不一樣的Consumer。

2.4.4 Exchanges

從架構圖能夠看出,Procuder Publish的Message進入了Exchange。接着經過"routing keys」, RabbitMQ會找到應該把這個Message放到哪一個queue裏。queue也是經過這個routing keys來作的綁定。有三種類型的Exchanges:direct, fanout,topic。 每一個實現了不一樣的路由算法(routing algorithm)。

  • Direct exchange:**若是 routing key 匹配,那麼Message就會被傳遞到相應的queue中。其實在queue建立時,它會自動的以queue的名字做爲routing key來綁定那個exchange。
  • Fanout exchange: 會向響應的queue廣播。
  • **Topic exchange:**對key進行模式匹配,好比ab能夠傳遞到全部ab的queue。

2.4.5 Virtual hosts

每一個virtual host本質上都是一個RabbitMQ Server,擁有它本身的queue,exchagne,和bings rule等等。這保證了你能夠在多個不一樣的Application中使用RabbitMQ。

2.4.6 爲何使用channel不實用tcp連接

  • tcp建立銷燬有三次握手和四次揮手,開銷太大
  • 操做系統tcp連接有限制,若是使用tcp連接,高峯期每秒成千上萬的連接形成資源浪費
  • channel的原理一個進程一條通道,多條進程多條通道公用一條tcp連接,一條tcp連接能夠容納無限的channel,不會有性能瓶頸。

三 部署

此文檔爲centos7 安裝部署

3.1 安裝erlang

# 配置yum源
cat > /etc/yum.repos.d/erlang.repo << EOF
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/\$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
EOF

複製代碼

3.2 配置yum源

cat > /etc/yum.repos.d/rabbitmq.repo <<EOF
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
EOF

rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
複製代碼

3.3 rabbitmq服務

yum -y install rabbitmq-server
chkconfig rabbitmq-server on


# 更改rabbitmq數據和日誌存儲目錄
# 建立數據和日誌目錄
mkdir -pv /data/rabbitmq/mnesia
mkdir -pv /data/rabbitmq/log
chown rabbitmq.rabbitmq /data/rabbitmq/* -R

# 建立配置文件
cat >/etc/rabbitmq/rabbitmq-env.conf <<EOF
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia
RABBITMQ_LOG_BASE=/data/rabbitmq/log
EOF


systemctl status rabbitmq-server

# 檢查本地cli工具是否定證成功
sudo rabbitmq-diagnostics ping

# 打印應用啓用的組件,tcp反省,內存使用,告警等等。
sudo rabbitmq-diagnostics status

# 打印節點有效的配置
sudo rabbitmq-diagnostics environment

# 本地節點監控檢查
sudo rabbitmq-diagnostics node_health_check

# 添加用戶
rabbitmqctl add_user xuel xuelpwd
rabbitmqctl list_users
Listing users ...
user	tags
xuel	[xuel]
guest	[administrator]

# 角色定義
none  最小權限角色
management 管理員角色
policymaker   決策者
monitoring  監控
administrator  超級管理員 


[root@VM_0_12_centos ~]# rabbitmqctl set_user_tags xuel administrator
Setting tags for user "xuel" to [administrator] ...
[root@VM_0_12_centos ~]# rabbitmqctl list_users
Listing users ...
user	tags
xuel	[administrator]
guest	[administrator]


#查看全部的隊列:
rabbitmqctl list_queues

# 新增虛擬主機:
rabbitmqctl add_vhost  vhost_name
# 將新虛擬主機受權給新用戶:
rabbitmqctl set_permissions -p vhost_name username '.*' '.*' '.*'
複製代碼

3.4 配置

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.17/ebin/rabbit.app
複製代碼

3.5 插件

rabbitmq-plugins enable rabbitmq_management

# rabbitmq 爲了安全guest用戶只能localhost訪問,開啓guest/guest登錄
cat > /etc/rabbitmq/rabbitmq.config <<EOF
[{rabbit, [{loopback_users, []}]}].
EOF

systemctl restart rabbitmq-server

# 頁面訪問http://ip:15672
複製代碼

四 使用

因爲技術棧爲python,此處簡單舉例python中rabbitmq的使用

  • Introduction

因爲AMQP是雙向RPC協議,客戶端能夠向服務器發送請求,服務器能夠向客戶端發送請求,所以Pika在其每一個異步鏈接適配器中實現或擴展IO循環。這些IO循環是阻塞循環和偵聽事件的方法。每一個異步適配器都遵循相同的標準來調用IO循環。建立鏈接適配器時會建立IO循環。要爲任何給定的適配器啓動IO循環,請調用connection.ioloop.start()方法。

  • install
pip install pika
複製代碼
  • demo
  1. We start by creating our connection object, then starting our event loop.
  2. When we are connected, the on_connected method is called. In that method we create a channel.
  3. When the channel is created, the on_channel_open method is called. In that method we declare a queue.
  4. When the queue is declared successfully, on_queue_declared is called. In that method we call channel.basic_consume telling it to call the handle_delivery for each message RabbitMQ delivers to us.
  5. When RabbitMQ has a message to send us, it calls the handle_delivery method passing the AMQP Method frame, Header frame, and Body.
import pika

# Create a global channel variable to hold our channel object in
channel = None

# Step #2
def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    # Open a channel
    connection.channel(on_open_callback=on_channel_open)

# Step #3
def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False, callback=on_queue_declared)

# Step #4
def on_queue_declared(frame):
    """Called when RabbitMQ has told us our Queue has been declared, frame is the response from RabbitMQ"""
    channel.basic_consume('test', handle_delivery)

# Step #5
def handle_delivery(channel, method, header, body):
    """Called when we receive a message from RabbitMQ"""
    print(body)

# Step #1: Connect to RabbitMQ using the default parameters
parameters = pika.ConnectionParameters()
connection = pika.SelectConnection(parameters, on_open_callback=on_connected)

try:
    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()
複製代碼

五 消息發送服務設計

圖片描述

最總消費者和生產者整體均跑在k8s集羣總,對於消息發送服務生產者發送消息攜帶routing_key,使用confirm確認,exchange使用direct模式,對應bind_key發送到對應queue中,在對英queue的connection中啓動多個channel,每一個對應本身多個consumer來提升併發。

參考連接

相關文章
相關標籤/搜索