Rabbitmq--topic
1、前言
前面講到direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但這裏的匹配規則有些不一樣,它約定: 前端
- routing key爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」
- binding key與routing key同樣也是句點號「. 」分隔的字符串
- binding key中能夠存在兩種特殊字符「*」與「#」,用於作模糊匹配,其中「*」用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)
以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1與Q2,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey。python
2、Exchange topic
topic 和 direct 改動很少,就是routing key 和bind key 須要改一下mysql
生產端:nginx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# -*- coding: UTF-8 -*-
import
pika
# 建立一個鏈接
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'localhost'
))
# 建立一個管道
channel
=
connection.channel()
# 聲明exchange 及類型
channel.exchange_declare(exchange
=
'topic_log'
,
exchange_type
=
'topic'
)
# 輸入信息,格式爲 *.info from *.info test 相似
input_data
=
input
(
'>>:'
).strip()
# 將輸入的信息以空格爲分割,轉換爲列表
data_list
=
input_data.split(
' '
)
# 三元運算,若是輸入信息存在,就使用輸入的信息data_list[0],不然用 'anonymous.info'
severity
=
data_list[
0
]
if
len
(data_list) >
1
else
'anonymous.info'
message
=
' '
.join(data_list[
2
:])
or
'hello,world!'
# 這裏的routing_key就是 data_list[0] 或 'info'
channel.basic_publish(exchange
=
'topic_log'
,
routing_key
=
severity,
body
=
message)
print
(
'[x] Sent %r:%r'
%
(severity, message))
connection.close()
|
消費端:git
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# -*- coding: UTF-8 -*-
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
host
=
'localhost'
))
channel
=
connection.channel()
# 聲明exchange 及類型
channel.exchange_declare(exchange
=
'topic_log'
,
exchange_type
=
'topic'
)
result
=
channel.queue_declare(exclusive
=
True
)
queue_name
=
result.method.queue
# 在此咱們定義一些列表,列表內容以下
# 這2個列表分別用來測試和routing_key匹配狀況
# 第一種只容許接收info的信息
# 第二種容許接收error 和 mysql的信息
# severities = ['*.info']
severities
=
[
'*.error'
,
'mysql.*'
]
for
severity
in
severities:
channel.queue_bind(exchange
=
'topic_log'
,
queue
=
queue_name,
routing_key
=
severity)
print
(
' [*] Waiting for logs. To exit press CTRL+C'
)
def
callback(ch, method, properties, body):
print
(
" [x] %r:%r"
%
(method.routing_key, body))
channel.basic_consume(callback,
queue
=
queue_name,
no_ack
=
True
)
channel.start_consuming()
|
咱們測試時,分別啓動兩個consumer。程序員
第一個consumer1 中使用 severities = ['*.info']web
第二個consumer2中使用 severities = ['*.error', 'mysql.*']面試
生產者分別輸入: sql
1
2
3
4
5
|
appache.info
from
appache info test
nginx.error
from
nginx error test
mysql.info
from
mysql info test
|
能夠看到日誌信息分別會彙總到兩個consumer中, 其中 consumer1 會收到 appache.info 和 mysql.info的信息, 而 consumer2 會收到 nginx.error 和 mysql.info 的信息。
【前端】SpreadJS表格控件,可嵌入系統開發的在線Excel
【培訓】阿里P8面試官:什麼樣的人能進阿里
【推薦】程序員問答平臺,解決您開發中遇到的技術難題
· rabbit channel參數
· RabbitMQ探索之路(一):RabbitMQ簡介
· Rabbitmq-topic演示
· RabbitMQ消息隊列(二)-RabbitMQ消息隊列架構與基本概念
· 消息隊列RabbitMQ
· 265年,571種植物滅絕
· 爲我國光子集成注入強「芯」劑
· 投資人對鋰礦沒了興趣,電動汽車行業受影響?
· 量子計算機的性能什麼時候能超越傳統計算機?
· 一初創公司開發廉價自動駕駛傳感器:最低不到500美圓
» 更多新聞...