RabbitMQ快速入門及python代碼示例

RabbitMQ快速入門及python代碼示例

安裝

系統環境html

[root@Centos ~]# uname -r
3.10.0-1127.el7.x86_64
[root@Centos ~]# cat /etc/redhat-release
CentOS Linux release 7.8.2003 (Core)

# 關閉firewalld
[root@Centos mq]# systemctl stop firewalld
[root@Centos mq]# systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.

# 禁用selinux
[root@Centos mq]# setenforce 0
[root@Centos mq]# getenforce
Permissive
[root@Centos mq]# vi /etc/sysconfig/selinux
把 SELINUX=enforcing   修改成 SELINUX=disabled

處理依賴node

  1. RabbitMQ是由Erlang語言編寫,因此須要安裝Erlang運行環境。
  2. 依賴socat,能夠不用單獨下載,經過yum安裝時處理依賴關係

RabbitMQ版本與Erlang版本的兼容關係請查看:https://www.rabbitmq.com/which-erlang.htmlpython

這次安裝版本:linux

RabbitMQ 3.8.5 下載地址:https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/noarch/git

Erlang 22.3.4 下載地址:https://github.com/rabbitmq/erlang-rpm/releasesgithub

[root@Centos mq]# pwd
/root/mq
[root@Centos mq]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v22.3.4.2/erlang-22.3.4.2-1.el7.x86_64.rpm
[root@Centos mq]# wget https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/noarch/rabbitmq-server-3.8.5-1.el7.noarch.rpm
[root@Centos mq]# ls -lh
total 34M
-rw-r--r--. 1 root root 20M Jun 26 20:28 erlang-22.3.4.2-1.el7.x86_64.rpm
-rw-r--r--. 1 root root 15M Jun 15 21:46 rabbitmq-server-3.8.5-1.el7.noarch.rpm

[root@Centos mq]# yum -y localinstall erlang-22.3.4.2-1.el7.x86_64.rpm rabbitmq-server-3.8.5-1.el7.noarch.rpm
Failed to set locale, defaulting to C
Loaded plugins: fastestmirror
Examining erlang-22.3.4.2-1.el7.x86_64.rpm: erlang-22.3.4.2-1.el7.x86_64
Marking erlang-22.3.4.2-1.el7.x86_64.rpm to be installed
Examining rabbitmq-server-3.8.5-1.el7.noarch.rpm: rabbitmq-server-3.8.5-1.el7.noarch
Marking rabbitmq-server-3.8.5-1.el7.noarch.rpm to be installed
Resolving Dependencies
--> Running transaction check
---> Package erlang.x86_64 0:22.3.4.2-1.el7 will be installed
---> Package rabbitmq-server.noarch 0:3.8.5-1.el7 will be installed
--> Processing Dependency: socat for package: rabbitmq-server-3.8.5-1.el7.noarch
Loading mirror speeds from cached hostfile
 * base: mirror.hkt.cc
 * extras: mirror.hkt.cc
 * updates: mirror.hkt.cc
--> Running transaction check
---> Package socat.x86_64 0:1.7.3.2-2.el7 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

====================================================================================================================
 Package                 Arch           Version                   Repository                                   Size
====================================================================================================================
Installing:
 erlang                  x86_64         22.3.4.2-1.el7            /erlang-22.3.4.2-1.el7.x86_64                34 M
 rabbitmq-server         noarch         3.8.5-1.el7               /rabbitmq-server-3.8.5-1.el7.noarch          15 M
Installing for dependencies:
 socat                   x86_64         1.7.3.2-2.el7             base                                        290 k

Transaction Summary
====================================================================================================================
Install  2 Packages (+1 Dependent package)

Total size: 50 M
Total download size: 290 k
Installed size: 51 M
Downloading packages:
socat-1.7.3.2-2.el7.x86_64.rpm                                                               | 290 kB  00:00:00
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
  Installing : erlang-22.3.4.2-1.el7.x86_64                                                                     1/3
  Installing : socat-1.7.3.2-2.el7.x86_64                                                                       2/3
  Installing : rabbitmq-server-3.8.5-1.el7.noarch                                                               3/3
  Verifying  : rabbitmq-server-3.8.5-1.el7.noarch                                                               1/3
  Verifying  : socat-1.7.3.2-2.el7.x86_64                                                                       2/3
  Verifying  : erlang-22.3.4.2-1.el7.x86_64                                                                     3/3

Installed:
  erlang.x86_64 0:22.3.4.2-1.el7                        rabbitmq-server.noarch 0:3.8.5-1.el7

Dependency Installed:
  socat.x86_64 0:1.7.3.2-2.el7

Complete!
[root@Centos mq]#

安裝完成後通常不須要額外的配置便可啓動RabbitMQweb

[root@Centos mq]# systemctl start rabbitmq-server
[root@Centos mq]# systemctl status rabbitmq-server
● rabbitmq-server.service - RabbitMQ broker
   Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
   Active: active (running) since Mon 2020-07-06 18:05:23 CST; 21s ago
 Main PID: 28698 (beam.smp)
   Status: "Initialized"
   CGroup: /system.slice/rabbitmq-server.service
           ├─28698 /usr/lib64/erlang/erts-10.7.2.1/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf ...
           ├─28801 erl_child_setup 32768
           ├─28825 /usr/lib64/erlang/erts-10.7.2.1/bin/epmd -daemon
           ├─28845 inet_gethost 4
           └─28846 inet_gethost 4

Jul 06 18:05:21 Centos rabbitmq-server[28698]: ##########  Licensed under the MPL 1.1. Website: https://rabbitmq.com
Jul 06 18:05:21 Centos rabbitmq-server[28698]: Doc guides: https://rabbitmq.com/documentation.html
Jul 06 18:05:21 Centos rabbitmq-server[28698]: Support:    https://rabbitmq.com/contact.html
Jul 06 18:05:21 Centos rabbitmq-server[28698]: Tutorials:  https://rabbitmq.com/getstarted.html
Jul 06 18:05:21 Centos rabbitmq-server[28698]: Monitoring: https://rabbitmq.com/monitoring.html
Jul 06 18:05:21 Centos rabbitmq-server[28698]: Logs: /var/log/rabbitmq/rabbit@Centos.log
Jul 06 18:05:21 Centos rabbitmq-server[28698]: /var/log/rabbitmq/rabbit@Centos_upgrade.log
Jul 06 18:05:21 Centos rabbitmq-server[28698]: Config file(s): (none)
Jul 06 18:05:23 Centos systemd[1]: Started RabbitMQ broker.
Jul 06 18:05:23 Centos rabbitmq-server[28698]: Starting broker... completed with 0 plugins.
Hint: Some lines were ellipsized, use -l to show in full.

[root@Centos mq]# ss -tan | grep 5672
LISTEN     0      128          *:25672                    *:*
LISTEN     0      128       [::]:5672                  [::]:*

5672是工做端口,25672是集羣間通訊商品。shell

啓動web管理插件緩存

[root@Centos mq]# rabbitmq
rabbitmqctl           rabbitmq-plugins      rabbitmq-server
rabbitmq-diagnostics  rabbitmq-queues       rabbitmq-upgrade
[root@Centos mq]# rabbitmq-plugins list
warning: the VM is running with native name encoding of latin1 which may cause Elixir to malfunction as it expects utf8. Please ensure your locale is set to UTF-8 (which can be verified by running "locale" in your shell)
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@Centos
 |/
[  ] rabbitmq_amqp1_0                  3.8.5
[  ] rabbitmq_auth_backend_cache       3.8.5
[  ] rabbitmq_auth_backend_http        3.8.5
[  ] rabbitmq_auth_backend_ldap        3.8.5
[  ] rabbitmq_auth_backend_oauth2      3.8.5
[  ] rabbitmq_auth_mechanism_ssl       3.8.5
[  ] rabbitmq_consistent_hash_exchange 3.8.5
[  ] rabbitmq_event_exchange           3.8.5
[  ] rabbitmq_federation               3.8.5
[  ] rabbitmq_federation_management    3.8.5
[  ] rabbitmq_jms_topic_exchange       3.8.5
[  ] rabbitmq_management               3.8.5
[  ] rabbitmq_management_agent         3.8.5
[  ] rabbitmq_mqtt                     3.8.5
[  ] rabbitmq_peer_discovery_aws       3.8.5
[  ] rabbitmq_peer_discovery_common    3.8.5
[  ] rabbitmq_peer_discovery_consul    3.8.5
[  ] rabbitmq_peer_discovery_etcd      3.8.5
[  ] rabbitmq_peer_discovery_k8s       3.8.5
[  ] rabbitmq_prometheus               3.8.5
[  ] rabbitmq_random_exchange          3.8.5
[  ] rabbitmq_recent_history_exchange  3.8.5
[  ] rabbitmq_sharding                 3.8.5
[  ] rabbitmq_shovel                   3.8.5
[  ] rabbitmq_shovel_management        3.8.5
[  ] rabbitmq_stomp                    3.8.5
[  ] rabbitmq_top                      3.8.5
[  ] rabbitmq_tracing                  3.8.5
[  ] rabbitmq_trust_store              3.8.5
[  ] rabbitmq_web_dispatch             3.8.5
[  ] rabbitmq_web_mqtt                 3.8.5
[  ] rabbitmq_web_mqtt_examples        3.8.5
[  ] rabbitmq_web_stomp                3.8.5
[  ] rabbitmq_web_stomp_examples       3.8.5
[root@Centos mq]# rabbitmq-plugins enable rabbitmq_management
warning: the VM is running with native name encoding of latin1 which may cause Elixir to malfunction as it expects utf8. Please ensure your locale is set to UTF-8 (which can be verified by running "locale" in your shell)
Enabling plugins on node rabbit@Centos:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@Centos...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch

started 3 plugins.
[root@Centos mq]# ss -tanl | grep 5672
LISTEN     0      128          *:25672                    *:*
LISTEN     0      128          *:15672                    *:*
LISTEN     0      128       [::]:5672                  [::]:*

rabbitmq-management管理插件啓用後會監聽15672端口,做爲管理web登錄:http://ip:15672,默認用戶爲guest,密碼guest,此用戶只能從本地登錄bash

用戶管理

rabbitmqctl add_user username password  增長用戶
rabbitmqctl delete_user username    刪除用戶
rabbitmqctl change_password username newpassword  更改密碼
rabbitmqctl set_user_tags username tag  設置權限tag
[root@Centos mq]# rabbitmqctl add_user admin admin123
Adding user "admin" ...
[root@Centos mq]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

RabbitMQ快速入門及python代碼示例

上圖中說明用戶屬於不一樣的tag擁有的權限。

當RabbitMQ安裝好後,僅有guest用戶擁有對默認虛擬主機「/」有訪問權限,而咱們本身增長的「admin」用戶沒有虛擬主機與之對應,因此咱們須要增長一個虛擬主機。

RabbitMQ快速入門及python代碼示例

RabbitMQ快速入門及python代碼示例

RabbitMQ經常使用的工做模型

RabbitMQ快速入門及python代碼示例

RabbitMQ快速入門及python代碼示例

其中1-5是在工做中常見的模型。下邊針對這5種模型展開講解

專業術語

Message Broker: 消息代理,RabbitMQ就是一個消息代理server

Producing: 生產,指僅發送消息數據,發送消息數據的程序就是Producer

Queue: 隊列,指RabbitMQ服務內部郵箱名稱,是存儲消息數據的容器,數據的存儲載體,只受主機的內存和硬盤約束,實質是一個大的消息緩衝區。

Consuming:消費,接收消息,接收消息數據的程序就是Consumer

Channel: 通道,一個鏈接容許多個客戶端鏈接

Exchange: 交換機(器),接收生產者發來的消息,決定如何路由給服務器中的隊列。經常使用的類型有:direct(point-to-point)、topic(publish-subscribe)、fanout(multicast)

Bind: 綁定,創建消息隊列和交換器間的關係,即交換器拿到數據,把什麼樣的數據送給哪一個隊列

Virtual Hosts: 虛擬主機,一批交換機、消息隊列和相關對象的集合。爲了多用戶互不干擾,使用虛擬主機分組交換機、消息隊列

Topic: 主題

下邊針對不一樣的工做模型使用python代碼來講明講解,使用的庫爲pika

$ pip install pika

簡單隊列

RabbitMQ快速入門及python代碼示例

這是最爲簡單的生產者消費者模型,消息隊列就是一個FIFO的隊列。

#producer.py
import pika

#創建鏈接
credentials = pika.PlainCredentials('admin', 'admin123')
parameters = pika.ConnectionParameters('172.16.152.130',
                                       5672,
                                       'test_vh',  # 虛擬主機
                                       credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()  # 建立一個通道

#通道上聲明隊列名稱,server中沒有此隊列就會建立
channel.queue_declare(queue='hello')

#使用默認交換機把body內容發送到名稱爲hello的隊列中
channel.basic_publish(exchange='',   # 爲空字符表示使用默認的交換機
                      routing_key='hello',  # exchange爲空字符串時,必須使用routing_key,表示把消息發往哪一個隊列
                      body='hello world!'  # 消息主體
                      )
print("Sent 'hello world!'")

connection.close()  #關閉鏈接

運行該程序後,能夠在web管理界面中查看到相應的Exchange,Queue已建立

RabbitMQ快速入門及python代碼示例

可見一個虛擬主機下會自動建立各類類型的交換機。

RabbitMQ快速入門及python代碼示例

上邊代碼中建立鏈接過於複雜,pika提供了另外一種更優雅的方法,代碼修改以下:

#producer.py
import pika

#更優雅的方式建立鏈接參數
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')

connection = pika.BlockingConnection(parameters)
channel = connection.channel()  #通連上建立一個通道

#通道上聲明隊列名稱,server中沒有此隊列就會建立
channel.queue_declare(queue='hello')

#使用默認交換機把body內容發送到名稱爲hello的隊列中
channel.basic_publish(exchange='',   #爲空字符表示使用默認的交換機
                      routing_key='hello',  #exchange爲空字符串時,必須使用routing_key,表示把消息發往哪一個隊列
                      body='hello world!!!!'  #消息主體
                      )

print("Sent 'hello world!!!!'")

connection.close()  #關閉鏈接

消費方的代碼以下:

#consumer.py
import pika

#建立鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')
connection = pika.BlockingConnection(parameters)  #阻塞鏈接
channel = connection.channel()

channel.queue_declare(queue='hello')  #聲明隊列

#回調函數
def callback(ch, method, properties, body):
    print('Received: {}'.format(body))
    print('channel: {}'.format(ch))
    print('method: {}'.format(method))
    print('properties: {}'.format(properties))

channel.basic_consume(
    queue='hello',
    on_message_callback=callback,
    auto_ack=True
)

print('Waiting for message. To exit press CTRL + C')
channel.start_consuming()  #循環取隊列數據

說明:

channel.basic_consume()中的auto_ack=True 時,表示隊列中的數據被消費後就被確認已被消費掉,若設置爲False 那當前消費者程序斷開後,以前被消費過的數據又被置爲Ready狀態,即又能被消費者從新消費。

工做隊列

RabbitMQ快速入門及python代碼示例

工做隊列即爲簡單隊列模型中的一個消費者變爲多個消費者。把生產者producer.py 中生產數據略爲修改:

#producer.py

import pika
import time

#創建鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')

connection = pika.BlockingConnection(parameters)
channel = connection.channel()  #通連上建立一個通道

#通道上聲明隊列名稱,server中沒有此隊列就會建立
channel.queue_declare(queue='hello')

#使用默認交換機把body內容發送到名稱爲hello的隊列中
for i in range(40):
    channel.basic_publish(exchange='',   #爲空字符表示使用默認的交換機
                          routing_key='hello',  #exchange爲空字符串時,必須使用routing_key,表示把消息發往哪一個隊列
                          body='data{:02}'.format(i)  #消息主體
                          )
    time.sleep(0.2)

print("Sent 'hello world!!!!'")

connection.close()  #關閉鏈接

消費者代碼:

#consumer1.py
import pika

#建立鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')
connection = pika.BlockingConnection(parameters)  #阻塞鏈接
channel = connection.channel()

channel.queue_declare(queue='hello')  #聲明隊列

#回調函數
def callback(ch, method, properties, body):
    print('Received: {}'.format(body))

channel.basic_consume(
    queue='hello',
    on_message_callback=callback,
    auto_ack=True
)

print('Waiting for message. To exit press CTRL + C')
channel.start_consuming()  #循環取隊列數據

測試時先運行消費者代碼,並複製多份運行,以運行兩份爲例,再運行生產者代碼,能夠從輸出中觀測到

#第一個消費者輸出
Received: b'data00'
Received: b'data02'
Received: b'data04'
Received: b'data06'
Received: b'data08'
Received: b'data10'
Received: b'data12'
Received: b'data14'
Received: b'data16'
Received: b'data18'
...
Received: b'data38'

#第二個消費者輸出
Received: b'data01'
Received: b'data03'
Received: b'data05'
Received: b'data07'
Received: b'data09'
Received: b'data11'
Received: b'data13'
Received: b'data15'
Received: b'data17'
Received: b'data19'
...
Received: b'data39'

可知,這種工做模式是一種競爭的工做方式,消息隊列中的一個消息只能由一個消費者消費,並且從結果可知,不一樣的消費者取數據是以輪詢的方式。

簡單隊列工做隊列模式的圖中沒有畫出交換機,但都使用了默認的交換機。

發佈/訂閱模型 Publish/Subscribe

試想生活中的訂報紙這樣一個場景,全部的訂閱者(消費者)訂閱一份報紙(消息),都應該拿到一分內容相同的報紙。

報社發佈報紙到郵局(Exchange),郵局決定經過怎樣的方式把報紙送到訂閱都的信箱,訂閱者從本身的信箱(Queue)中獲取報紙。

RabbitMQ快速入門及python代碼示例

當前Publish/Subscribe這種模式,Exchange的類型爲fanout,即爲一對多,廣播模式。

RabbitMQ快速入門及python代碼示例

多個Queue須要與Exchange創建關係,這裏就是Binging。

生產者代碼以下:

import pika
import time

#建立鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()  #通連上建立一個通道

#定義交換機
channel.exchange_declare(exchange='logs',  #指定交換機
                         exchange_type='fanout'  #指定交換機類型
                         )

#向交換機中發送數據
for i in range(40):
    channel.basic_publish(exchange='logs',   #指定交換機
                          routing_key='',  #fanout類型不指定
                          body='data{:02}'.format(i)  #消息主體
                          )
    time.sleep(0.2)

print("Sent 'hello world!!!!'")

connection.close()  # 關閉鏈接

生產者不關心queue,只關心數據要發往哪一個交換機。沒有queue來存儲數據意味着沒有消費者時,生產者生產的數據發送到交換機後就丟棄掉。

消費者代碼以下:

import pika

#建立鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()  #通連上建立一個通道

#定義交換機
channel.exchange_declare(exchange='logs',  #指定交換機
                         exchange_type='fanout'  #指定交換機類型
                         )

#生成queue
result1 = channel.queue_declare(queue='',  #爲空字符串時會生成一個惟一的隊列名稱
                                exclusive=True  #表示當前生成的隊列只容許當前這個鏈接使用,鏈接一旦斷開,當前隊列也將自動刪除
                                )
result2 = channel.queue_declare(queue='', exclusive=True)

q1name = result1.method.queue  #獲取隊列的名稱
q2name = result2.method.queue
print(q1name, q2name)

#綁定binding
channel.queue_bind(exchange='logs', queue=q1name)
channel.queue_bind(exchange='logs', queue=q2name)
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    #print(ch, method, properties, body)
    print('Received: {}'.format(body))

channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True)
channel.basic_consume(queue=q2name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

先運行消費者代碼後,logs交換機被建立,相應的queue被建立

RabbitMQ快速入門及python代碼示例

RabbitMQ快速入門及python代碼示例

再運行生產者代碼,輸出以下:

amq.gen-_pqdfOYPRb0kvKgLBEZSCw amq.gen-PUV29dEM8j2EKYjcmS78BQ
 [*] Waiting for logs. To exit press CTRL+C
Received: b'data00'
Received: b'data00'
Received: b'data01'
Received: b'data01'
Received: b'data02'
Received: b'data02'
Received: b'data03'
Received: b'data03'
Received: b'data04'
Received: b'data04'
...
Received: b'data37'
Received: b'data37'
Received: b'data38'
Received: b'data38'
Received: b'data39'
Received: b'data39'

每個消息都打印了兩次,在實際生產環境中,若是一個生產者的數據有可能多個業務模塊都須要獲取,那能夠採起此種模式,只要在該業務模塊中指定相應的交換機,本身生成一個queue來緩存相應的數據便可。

若是先啓動了生產者,接着啓動消費者,那部分數據會被丟失。因沒有queue來存儲數據,exchange收到數據後就丟掉。

路由Routing模型

RabbitMQ快速入門及python代碼示例

Routing模型就是數據發送到交換機後根據規則(routing_key)進行路由發送。該模型下交換機類型爲direct

生產者代碼:

import pika
import time
import random

#建立鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()  #通連上建立一個通道

x_name = 'color'  #交換機名稱
colors = ('orange', 'red', 'green', 'black')  # routing_key

# 定義交換機及類型
channel.exchange_declare(exchange=x_name,  #交換機名稱
                         exchange_type='direct',  #路由
                         )

for i in range(20):
    rk = colors[random.randint(0, 3)]
    channel.basic_publish(
        exchange=x_name,
        routing_key=rk,
        body='data_{}_{:02}'.format(rk, i)
    )
    time.sleep(0.2)

connection.close()

消費者代碼:

import pika

#建立鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()  #通連上建立一個通道

x_name = 'color'  #交換機名稱
colors = ('orange', 'red', 'green', 'black')  #routing_key

#定義交換機
channel.exchange_declare(exchange=x_name,  #指定交換機
                         exchange_type='direct',  #指定交換機類型
                         )

#生成queue
result1 = channel.queue_declare(queue='', exclusive=True)
q1name = result1.method.queue
print(q1name)

#綁定
channel.queue_bind(exchange=x_name, queue=q1name, routing_key=colors[0])
print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    # print(ch, method, properties, body)
    print('Received: {}'.format(body))

channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

先運行消費者,再運行生產者,消費者的終端輸出

amq.gen-DAOzdpOzfFodaJI9xj6b4Q
 [*] Waiting for logs. To exit press CTRL+C
Received: b'data_orange_01'
Received: b'data_orange_06'
Received: b'data_orange_08'
Received: b'data_orange_09'
Received: b'data_orange_17'
Received: b'data_orange_18'

只有routing_key=colors[0]即爲orange的消息才被路由到了消費都定義的queue上後被消費者獲取。

在web界面中也能查看到相應的交換機、queue、routing_key間的綁定關係

RabbitMQ快速入門及python代碼示例

多重綁定

RabbitMQ快速入門及python代碼示例

如圖,若是一個routing_key被屢次綁定,那和fanout模式就相似了,但又有不一樣,fanout時exchange不作數據過慮,而direct時仍然會作數據過濾這個動做,只是過濾後會把相應的消息發送到多個隊列中。

Topic模型

RabbitMQ快速入門及python代碼示例

Topic的routing_key必須使用.點號 分割的單詞組成。支持通配符:

*   表示嚴格的一個單詞
#   表示0個或多個單詞

若是queue綁定的routing_key只是一個#,這個queue其實能夠接收全部消息,相似於fanout

若是沒有使用任何通配符,效果相似於direct。

生產者代碼:

import pika
import time
import random

#建立鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()  #通連上建立一個通道

x_name = 'products'  #交換機名稱
product_types = ('phone', 'pc', 'tv')  #產品類型
colors = ('red', 'green', 'blue')

#定義交換機及類型
channel.exchange_declare(exchange=x_name,  #交換機名稱
                         exchange_type='topic',  #話題模式
                         )

for i in range(20):
    rk = '{}.{}'.format(product_types[random.randint(0, 2)], colors[random.randint(0, 2)])
    msg = 'data_{}_{:02}'.format(rk, i)
    channel.basic_publish(exchange=x_name,
                          routing_key=rk,
                          body=msg
                          )
    time.sleep(0.2)

connection.close()

消費者代碼:

import pika

#建立鏈接
parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()  #通連上建立一個通道

x_name = 'products'  #交換機名稱
topics = ('phone.*', '*.red')

#定義交換機
channel.exchange_declare(exchange=x_name,  #指定交換機
                         exchange_type='topic',  #指定交換機類型
                         )

#生成queue
q1 = channel.queue_declare(queue='', exclusive=True)
q1name = q1.method.queue

#綁定
channel.queue_bind(exchange=x_name, queue=q1name, routing_key=topics[0]) #修改routing_key後再測試

def callback(ch, method, properties, body):
    #print(ch, method, properties, body)
    print('Received: {}'.format(body))

#消費
channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True)

#循環取隊列數據
print('Waiting for message. To exit press CTRL+C')
channel.start_consuming()

先運行消費者,再運行生產者,在消費者程序的終端中輸出:

Waiting for message. To exit press CTRL+C
Received: b'data_phone.blue_00'
Received: b'data_phone.red_01'
Received: b'data_phone.green_02'
Received: b'data_phone.green_03'
Received: b'data_phone.blue_06'
Received: b'data_phone.blue_08'
Received: b'data_phone.green_10'
Received: b'data_phone.blue_16'

符合topics[0],即phone.*的匹配模式。

Topic實質上也是direct,只是支持模式匹配而已。

RPC和Publisher Confirms使用較少,不作說明。

參考:

https://www.rabbitmq.com/install-rpm.html#downloads

https://www.rabbitmq.com/tutorials/tutorial-one-python.html

https://www.rabbitmq.com/tutorials/tutorial-two-python.html

https://www.rabbitmq.com/tutorials/tutorial-three-python.html

https://www.rabbitmq.com/tutorials/tutorial-four-python.html

https://www.rabbitmq.com/tutorials/tutorial-five-python.html

相關文章
相關標籤/搜索