系統環境html
[root@Centos ~]# uname -r 3.10.0-1127.el7.x86_64 [root@Centos ~]# cat /etc/redhat-release CentOS Linux release 7.8.2003 (Core) # 關閉firewalld [root@Centos mq]# systemctl stop firewalld [root@Centos mq]# systemctl disable firewalld Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service. Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service. # 禁用selinux [root@Centos mq]# setenforce 0 [root@Centos mq]# getenforce Permissive [root@Centos mq]# vi /etc/sysconfig/selinux 把 SELINUX=enforcing 修改成 SELINUX=disabled
處理依賴node
RabbitMQ版本與Erlang版本的兼容關係請查看:https://www.rabbitmq.com/which-erlang.htmlpython
這次安裝版本:linux
RabbitMQ 3.8.5 下載地址:https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/noarch/git
Erlang 22.3.4 下載地址:https://github.com/rabbitmq/erlang-rpm/releasesgithub
[root@Centos mq]# pwd /root/mq [root@Centos mq]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v22.3.4.2/erlang-22.3.4.2-1.el7.x86_64.rpm [root@Centos mq]# wget https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/noarch/rabbitmq-server-3.8.5-1.el7.noarch.rpm [root@Centos mq]# ls -lh total 34M -rw-r--r--. 1 root root 20M Jun 26 20:28 erlang-22.3.4.2-1.el7.x86_64.rpm -rw-r--r--. 1 root root 15M Jun 15 21:46 rabbitmq-server-3.8.5-1.el7.noarch.rpm [root@Centos mq]# yum -y localinstall erlang-22.3.4.2-1.el7.x86_64.rpm rabbitmq-server-3.8.5-1.el7.noarch.rpm Failed to set locale, defaulting to C Loaded plugins: fastestmirror Examining erlang-22.3.4.2-1.el7.x86_64.rpm: erlang-22.3.4.2-1.el7.x86_64 Marking erlang-22.3.4.2-1.el7.x86_64.rpm to be installed Examining rabbitmq-server-3.8.5-1.el7.noarch.rpm: rabbitmq-server-3.8.5-1.el7.noarch Marking rabbitmq-server-3.8.5-1.el7.noarch.rpm to be installed Resolving Dependencies --> Running transaction check ---> Package erlang.x86_64 0:22.3.4.2-1.el7 will be installed ---> Package rabbitmq-server.noarch 0:3.8.5-1.el7 will be installed --> Processing Dependency: socat for package: rabbitmq-server-3.8.5-1.el7.noarch Loading mirror speeds from cached hostfile * base: mirror.hkt.cc * extras: mirror.hkt.cc * updates: mirror.hkt.cc --> Running transaction check ---> Package socat.x86_64 0:1.7.3.2-2.el7 will be installed --> Finished Dependency Resolution Dependencies Resolved ==================================================================================================================== Package Arch Version Repository Size ==================================================================================================================== Installing: erlang x86_64 22.3.4.2-1.el7 /erlang-22.3.4.2-1.el7.x86_64 34 M rabbitmq-server noarch 3.8.5-1.el7 /rabbitmq-server-3.8.5-1.el7.noarch 15 M Installing for dependencies: socat x86_64 1.7.3.2-2.el7 base 290 k Transaction Summary ==================================================================================================================== Install 2 Packages (+1 Dependent package) Total size: 50 M Total download size: 290 k Installed size: 51 M Downloading packages: socat-1.7.3.2-2.el7.x86_64.rpm | 290 kB 00:00:00 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : erlang-22.3.4.2-1.el7.x86_64 1/3 Installing : socat-1.7.3.2-2.el7.x86_64 2/3 Installing : rabbitmq-server-3.8.5-1.el7.noarch 3/3 Verifying : rabbitmq-server-3.8.5-1.el7.noarch 1/3 Verifying : socat-1.7.3.2-2.el7.x86_64 2/3 Verifying : erlang-22.3.4.2-1.el7.x86_64 3/3 Installed: erlang.x86_64 0:22.3.4.2-1.el7 rabbitmq-server.noarch 0:3.8.5-1.el7 Dependency Installed: socat.x86_64 0:1.7.3.2-2.el7 Complete! [root@Centos mq]#
安裝完成後通常不須要額外的配置便可啓動RabbitMQweb
[root@Centos mq]# systemctl start rabbitmq-server [root@Centos mq]# systemctl status rabbitmq-server ● rabbitmq-server.service - RabbitMQ broker Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled) Active: active (running) since Mon 2020-07-06 18:05:23 CST; 21s ago Main PID: 28698 (beam.smp) Status: "Initialized" CGroup: /system.slice/rabbitmq-server.service ├─28698 /usr/lib64/erlang/erts-10.7.2.1/bin/beam.smp -W w -K true -A 64 -MBas ageffcbf -MHas ageffcbf ... ├─28801 erl_child_setup 32768 ├─28825 /usr/lib64/erlang/erts-10.7.2.1/bin/epmd -daemon ├─28845 inet_gethost 4 └─28846 inet_gethost 4 Jul 06 18:05:21 Centos rabbitmq-server[28698]: ########## Licensed under the MPL 1.1. Website: https://rabbitmq.com Jul 06 18:05:21 Centos rabbitmq-server[28698]: Doc guides: https://rabbitmq.com/documentation.html Jul 06 18:05:21 Centos rabbitmq-server[28698]: Support: https://rabbitmq.com/contact.html Jul 06 18:05:21 Centos rabbitmq-server[28698]: Tutorials: https://rabbitmq.com/getstarted.html Jul 06 18:05:21 Centos rabbitmq-server[28698]: Monitoring: https://rabbitmq.com/monitoring.html Jul 06 18:05:21 Centos rabbitmq-server[28698]: Logs: /var/log/rabbitmq/rabbit@Centos.log Jul 06 18:05:21 Centos rabbitmq-server[28698]: /var/log/rabbitmq/rabbit@Centos_upgrade.log Jul 06 18:05:21 Centos rabbitmq-server[28698]: Config file(s): (none) Jul 06 18:05:23 Centos systemd[1]: Started RabbitMQ broker. Jul 06 18:05:23 Centos rabbitmq-server[28698]: Starting broker... completed with 0 plugins. Hint: Some lines were ellipsized, use -l to show in full. [root@Centos mq]# ss -tan | grep 5672 LISTEN 0 128 *:25672 *:* LISTEN 0 128 [::]:5672 [::]:*
5672是工做端口,25672是集羣間通訊商品。shell
啓動web管理插件緩存
[root@Centos mq]# rabbitmq rabbitmqctl rabbitmq-plugins rabbitmq-server rabbitmq-diagnostics rabbitmq-queues rabbitmq-upgrade [root@Centos mq]# rabbitmq-plugins list warning: the VM is running with native name encoding of latin1 which may cause Elixir to malfunction as it expects utf8. Please ensure your locale is set to UTF-8 (which can be verified by running "locale" in your shell) Listing plugins with pattern ".*" ... Configured: E = explicitly enabled; e = implicitly enabled | Status: * = running on rabbit@Centos |/ [ ] rabbitmq_amqp1_0 3.8.5 [ ] rabbitmq_auth_backend_cache 3.8.5 [ ] rabbitmq_auth_backend_http 3.8.5 [ ] rabbitmq_auth_backend_ldap 3.8.5 [ ] rabbitmq_auth_backend_oauth2 3.8.5 [ ] rabbitmq_auth_mechanism_ssl 3.8.5 [ ] rabbitmq_consistent_hash_exchange 3.8.5 [ ] rabbitmq_event_exchange 3.8.5 [ ] rabbitmq_federation 3.8.5 [ ] rabbitmq_federation_management 3.8.5 [ ] rabbitmq_jms_topic_exchange 3.8.5 [ ] rabbitmq_management 3.8.5 [ ] rabbitmq_management_agent 3.8.5 [ ] rabbitmq_mqtt 3.8.5 [ ] rabbitmq_peer_discovery_aws 3.8.5 [ ] rabbitmq_peer_discovery_common 3.8.5 [ ] rabbitmq_peer_discovery_consul 3.8.5 [ ] rabbitmq_peer_discovery_etcd 3.8.5 [ ] rabbitmq_peer_discovery_k8s 3.8.5 [ ] rabbitmq_prometheus 3.8.5 [ ] rabbitmq_random_exchange 3.8.5 [ ] rabbitmq_recent_history_exchange 3.8.5 [ ] rabbitmq_sharding 3.8.5 [ ] rabbitmq_shovel 3.8.5 [ ] rabbitmq_shovel_management 3.8.5 [ ] rabbitmq_stomp 3.8.5 [ ] rabbitmq_top 3.8.5 [ ] rabbitmq_tracing 3.8.5 [ ] rabbitmq_trust_store 3.8.5 [ ] rabbitmq_web_dispatch 3.8.5 [ ] rabbitmq_web_mqtt 3.8.5 [ ] rabbitmq_web_mqtt_examples 3.8.5 [ ] rabbitmq_web_stomp 3.8.5 [ ] rabbitmq_web_stomp_examples 3.8.5 [root@Centos mq]# rabbitmq-plugins enable rabbitmq_management warning: the VM is running with native name encoding of latin1 which may cause Elixir to malfunction as it expects utf8. Please ensure your locale is set to UTF-8 (which can be verified by running "locale" in your shell) Enabling plugins on node rabbit@Centos: rabbitmq_management The following plugins have been configured: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@Centos... The following plugins have been enabled: rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch started 3 plugins. [root@Centos mq]# ss -tanl | grep 5672 LISTEN 0 128 *:25672 *:* LISTEN 0 128 *:15672 *:* LISTEN 0 128 [::]:5672 [::]:*
rabbitmq-management管理插件啓用後會監聽15672端口,做爲管理web登錄:http://ip:15672,默認用戶爲guest,密碼guest,此用戶只能從本地登錄。bash
用戶管理
rabbitmqctl add_user username password 增長用戶 rabbitmqctl delete_user username 刪除用戶 rabbitmqctl change_password username newpassword 更改密碼 rabbitmqctl set_user_tags username tag 設置權限tag
[root@Centos mq]# rabbitmqctl add_user admin admin123 Adding user "admin" ... [root@Centos mq]# rabbitmqctl set_user_tags admin administrator Setting tags for user "admin" to [administrator] ...
上圖中說明用戶屬於不一樣的tag擁有的權限。
當RabbitMQ安裝好後,僅有guest用戶擁有對默認虛擬主機「/」有訪問權限,而咱們本身增長的「admin」用戶沒有虛擬主機與之對應,因此咱們須要增長一個虛擬主機。
其中1-5是在工做中常見的模型。下邊針對這5種模型展開講解
專業術語
Message Broker: 消息代理,RabbitMQ就是一個消息代理server
Producing: 生產,指僅發送消息數據,發送消息數據的程序就是Producer
Queue: 隊列,指RabbitMQ服務內部郵箱名稱,是存儲消息數據的容器,數據的存儲載體,只受主機的內存和硬盤約束,實質是一個大的消息緩衝區。
Consuming:消費,接收消息,接收消息數據的程序就是Consumer
Channel: 通道,一個鏈接容許多個客戶端鏈接
Exchange: 交換機(器),接收生產者發來的消息,決定如何路由給服務器中的隊列。經常使用的類型有:direct(point-to-point)、topic(publish-subscribe)、fanout(multicast)
Bind: 綁定,創建消息隊列和交換器間的關係,即交換器拿到數據,把什麼樣的數據送給哪一個隊列
Virtual Hosts: 虛擬主機,一批交換機、消息隊列和相關對象的集合。爲了多用戶互不干擾,使用虛擬主機分組交換機、消息隊列
Topic: 主題
下邊針對不一樣的工做模型使用python代碼來講明講解,使用的庫爲pika
$ pip install pika
這是最爲簡單的生產者消費者模型,消息隊列就是一個FIFO的隊列。
#producer.py import pika #創建鏈接 credentials = pika.PlainCredentials('admin', 'admin123') parameters = pika.ConnectionParameters('172.16.152.130', 5672, 'test_vh', # 虛擬主機 credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() # 建立一個通道 #通道上聲明隊列名稱,server中沒有此隊列就會建立 channel.queue_declare(queue='hello') #使用默認交換機把body內容發送到名稱爲hello的隊列中 channel.basic_publish(exchange='', # 爲空字符表示使用默認的交換機 routing_key='hello', # exchange爲空字符串時,必須使用routing_key,表示把消息發往哪一個隊列 body='hello world!' # 消息主體 ) print("Sent 'hello world!'") connection.close() #關閉鏈接
運行該程序後,能夠在web管理界面中查看到相應的Exchange,Queue已建立
可見一個虛擬主機下會自動建立各類類型的交換機。
上邊代碼中建立鏈接過於複雜,pika提供了另外一種更優雅的方法,代碼修改以下:
#producer.py import pika #更優雅的方式建立鏈接參數 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通連上建立一個通道 #通道上聲明隊列名稱,server中沒有此隊列就會建立 channel.queue_declare(queue='hello') #使用默認交換機把body內容發送到名稱爲hello的隊列中 channel.basic_publish(exchange='', #爲空字符表示使用默認的交換機 routing_key='hello', #exchange爲空字符串時,必須使用routing_key,表示把消息發往哪一個隊列 body='hello world!!!!' #消息主體 ) print("Sent 'hello world!!!!'") connection.close() #關閉鏈接
消費方的代碼以下:
#consumer.py import pika #建立鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) #阻塞鏈接 channel = connection.channel() channel.queue_declare(queue='hello') #聲明隊列 #回調函數 def callback(ch, method, properties, body): print('Received: {}'.format(body)) print('channel: {}'.format(ch)) print('method: {}'.format(method)) print('properties: {}'.format(properties)) channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True ) print('Waiting for message. To exit press CTRL + C') channel.start_consuming() #循環取隊列數據
說明:
channel.basic_consume()
中的auto_ack=True
時,表示隊列中的數據被消費後就被確認已被消費掉,若設置爲False
那當前消費者程序斷開後,以前被消費過的數據又被置爲Ready
狀態,即又能被消費者從新消費。
工做隊列即爲簡單隊列模型中的一個消費者變爲多個消費者。把生產者producer.py
中生產數據略爲修改:
#producer.py import pika import time #創建鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通連上建立一個通道 #通道上聲明隊列名稱,server中沒有此隊列就會建立 channel.queue_declare(queue='hello') #使用默認交換機把body內容發送到名稱爲hello的隊列中 for i in range(40): channel.basic_publish(exchange='', #爲空字符表示使用默認的交換機 routing_key='hello', #exchange爲空字符串時,必須使用routing_key,表示把消息發往哪一個隊列 body='data{:02}'.format(i) #消息主體 ) time.sleep(0.2) print("Sent 'hello world!!!!'") connection.close() #關閉鏈接
消費者代碼:
#consumer1.py import pika #建立鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) #阻塞鏈接 channel = connection.channel() channel.queue_declare(queue='hello') #聲明隊列 #回調函數 def callback(ch, method, properties, body): print('Received: {}'.format(body)) channel.basic_consume( queue='hello', on_message_callback=callback, auto_ack=True ) print('Waiting for message. To exit press CTRL + C') channel.start_consuming() #循環取隊列數據
測試時先運行消費者代碼,並複製多份運行,以運行兩份爲例,再運行生產者代碼,能夠從輸出中觀測到
#第一個消費者輸出 Received: b'data00' Received: b'data02' Received: b'data04' Received: b'data06' Received: b'data08' Received: b'data10' Received: b'data12' Received: b'data14' Received: b'data16' Received: b'data18' ... Received: b'data38' #第二個消費者輸出 Received: b'data01' Received: b'data03' Received: b'data05' Received: b'data07' Received: b'data09' Received: b'data11' Received: b'data13' Received: b'data15' Received: b'data17' Received: b'data19' ... Received: b'data39'
可知,這種工做模式是一種競爭的工做方式,消息隊列中的一個消息只能由一個消費者消費,並且從結果可知,不一樣的消費者取數據是以輪詢
的方式。
簡單隊列
和工做隊列
模式的圖中沒有畫出交換機,但都使用了默認的交換機。
試想生活中的訂報紙這樣一個場景,全部的訂閱者(消費者)訂閱一份報紙(消息),都應該拿到一分內容相同的報紙。
報社發佈報紙到郵局(Exchange),郵局決定經過怎樣的方式把報紙送到訂閱都的信箱,訂閱者從本身的信箱(Queue)中獲取報紙。
當前Publish/Subscribe這種模式,Exchange的類型爲fanout
,即爲一對多,廣播模式。
多個Queue須要與Exchange創建關係,這裏就是Binging。
生產者代碼以下:
import pika import time #建立鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通連上建立一個通道 #定義交換機 channel.exchange_declare(exchange='logs', #指定交換機 exchange_type='fanout' #指定交換機類型 ) #向交換機中發送數據 for i in range(40): channel.basic_publish(exchange='logs', #指定交換機 routing_key='', #fanout類型不指定 body='data{:02}'.format(i) #消息主體 ) time.sleep(0.2) print("Sent 'hello world!!!!'") connection.close() # 關閉鏈接
生產者不關心queue,只關心數據要發往哪一個交換機。沒有queue來存儲數據意味着沒有消費者時,生產者生產的數據發送到交換機後就丟棄掉。
消費者代碼以下:
import pika #建立鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通連上建立一個通道 #定義交換機 channel.exchange_declare(exchange='logs', #指定交換機 exchange_type='fanout' #指定交換機類型 ) #生成queue result1 = channel.queue_declare(queue='', #爲空字符串時會生成一個惟一的隊列名稱 exclusive=True #表示當前生成的隊列只容許當前這個鏈接使用,鏈接一旦斷開,當前隊列也將自動刪除 ) result2 = channel.queue_declare(queue='', exclusive=True) q1name = result1.method.queue #獲取隊列的名稱 q2name = result2.method.queue print(q1name, q2name) #綁定binding channel.queue_bind(exchange='logs', queue=q1name) channel.queue_bind(exchange='logs', queue=q2name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): #print(ch, method, properties, body) print('Received: {}'.format(body)) channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True) channel.basic_consume(queue=q2name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
先運行消費者代碼後,logs交換機被建立,相應的queue被建立
再運行生產者代碼,輸出以下:
amq.gen-_pqdfOYPRb0kvKgLBEZSCw amq.gen-PUV29dEM8j2EKYjcmS78BQ [*] Waiting for logs. To exit press CTRL+C Received: b'data00' Received: b'data00' Received: b'data01' Received: b'data01' Received: b'data02' Received: b'data02' Received: b'data03' Received: b'data03' Received: b'data04' Received: b'data04' ... Received: b'data37' Received: b'data37' Received: b'data38' Received: b'data38' Received: b'data39' Received: b'data39'
每個消息都打印了兩次,在實際生產環境中,若是一個生產者的數據有可能多個業務模塊都須要獲取,那能夠採起此種模式,只要在該業務模塊中指定相應的交換機,本身生成一個queue來緩存相應的數據便可。
若是先啓動了生產者,接着啓動消費者,那部分數據會被丟失。因沒有queue來存儲數據,exchange收到數據後就丟掉。
Routing模型就是數據發送到交換機後根據規則(routing_key)進行路由發送。該模型下交換機類型爲direct
。
生產者代碼:
import pika import time import random #建立鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通連上建立一個通道 x_name = 'color' #交換機名稱 colors = ('orange', 'red', 'green', 'black') # routing_key # 定義交換機及類型 channel.exchange_declare(exchange=x_name, #交換機名稱 exchange_type='direct', #路由 ) for i in range(20): rk = colors[random.randint(0, 3)] channel.basic_publish( exchange=x_name, routing_key=rk, body='data_{}_{:02}'.format(rk, i) ) time.sleep(0.2) connection.close()
消費者代碼:
import pika #建立鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通連上建立一個通道 x_name = 'color' #交換機名稱 colors = ('orange', 'red', 'green', 'black') #routing_key #定義交換機 channel.exchange_declare(exchange=x_name, #指定交換機 exchange_type='direct', #指定交換機類型 ) #生成queue result1 = channel.queue_declare(queue='', exclusive=True) q1name = result1.method.queue print(q1name) #綁定 channel.queue_bind(exchange=x_name, queue=q1name, routing_key=colors[0]) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): # print(ch, method, properties, body) print('Received: {}'.format(body)) channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
先運行消費者,再運行生產者,消費者的終端輸出
amq.gen-DAOzdpOzfFodaJI9xj6b4Q [*] Waiting for logs. To exit press CTRL+C Received: b'data_orange_01' Received: b'data_orange_06' Received: b'data_orange_08' Received: b'data_orange_09' Received: b'data_orange_17' Received: b'data_orange_18'
只有routing_key=colors[0]
即爲orange
的消息才被路由到了消費都定義的queue上後被消費者獲取。
在web界面中也能查看到相應的交換機、queue、routing_key間的綁定關係
多重綁定
如圖,若是一個routing_key被屢次綁定,那和fanout
模式就相似了,但又有不一樣,fanout
時exchange不作數據過慮,而direct
時仍然會作數據過濾這個動做,只是過濾後會把相應的消息發送到多個隊列中。
Topic的routing_key必須使用.
點號 分割的單詞組成。支持通配符:
* 表示嚴格的一個單詞 # 表示0個或多個單詞
若是queue綁定的routing_key只是一個#
,這個queue其實能夠接收全部消息,相似於fanout
若是沒有使用任何通配符,效果相似於direct。
生產者代碼:
import pika import time import random #建立鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通連上建立一個通道 x_name = 'products' #交換機名稱 product_types = ('phone', 'pc', 'tv') #產品類型 colors = ('red', 'green', 'blue') #定義交換機及類型 channel.exchange_declare(exchange=x_name, #交換機名稱 exchange_type='topic', #話題模式 ) for i in range(20): rk = '{}.{}'.format(product_types[random.randint(0, 2)], colors[random.randint(0, 2)]) msg = 'data_{}_{:02}'.format(rk, i) channel.basic_publish(exchange=x_name, routing_key=rk, body=msg ) time.sleep(0.2) connection.close()
消費者代碼:
import pika #建立鏈接 parameters = pika.URLParameters('amqp://admin:admin123@172.16.152.130:5672/test_vh') connection = pika.BlockingConnection(parameters) channel = connection.channel() #通連上建立一個通道 x_name = 'products' #交換機名稱 topics = ('phone.*', '*.red') #定義交換機 channel.exchange_declare(exchange=x_name, #指定交換機 exchange_type='topic', #指定交換機類型 ) #生成queue q1 = channel.queue_declare(queue='', exclusive=True) q1name = q1.method.queue #綁定 channel.queue_bind(exchange=x_name, queue=q1name, routing_key=topics[0]) #修改routing_key後再測試 def callback(ch, method, properties, body): #print(ch, method, properties, body) print('Received: {}'.format(body)) #消費 channel.basic_consume(queue=q1name, on_message_callback=callback, auto_ack=True) #循環取隊列數據 print('Waiting for message. To exit press CTRL+C') channel.start_consuming()
先運行消費者,再運行生產者,在消費者程序的終端中輸出:
Waiting for message. To exit press CTRL+C Received: b'data_phone.blue_00' Received: b'data_phone.red_01' Received: b'data_phone.green_02' Received: b'data_phone.green_03' Received: b'data_phone.blue_06' Received: b'data_phone.blue_08' Received: b'data_phone.green_10' Received: b'data_phone.blue_16'
符合topics[0],即phone.*
的匹配模式。
Topic實質上也是direct,只是支持模式匹配而已。
RPC和Publisher Confirms使用較少,不作說明。
參考:
https://www.rabbitmq.com/install-rpm.html#downloads
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
https://www.rabbitmq.com/tutorials/tutorial-two-python.html
https://www.rabbitmq.com/tutorials/tutorial-three-python.html
https://www.rabbitmq.com/tutorials/tutorial-four-python.html
https://www.rabbitmq.com/tutorials/tutorial-five-python.html