Rabbitmq--topic Rabbitmq--topic

 

Rabbitmq--topic

 

 

1、前言

  前面講到direct類型的Exchange路由規則是徹底匹配binding key與routing key,但這種嚴格的匹配方式在不少狀況下不能知足實際業務需求。topic類型的Exchange在匹配規則上進行了擴展,它與direct類型的Exchage類似,也是將消息路由到binding key與routing key相匹配的Queue中,但這裏的匹配規則有些不一樣,它約定: 前端

  • routing key爲一個句點號「. 」分隔的字符串(咱們將被句點號「. 」分隔開的每一段獨立的字符串稱爲一個單詞),如「stock.usd.nyse」、「nyse.vmw」、「quick.orange.rabbit」
  • binding key與routing key同樣也是句點號「. 」分隔的字符串
  • binding key中能夠存在兩種特殊字符「*」與「#」,用於作模糊匹配,其中「*」用於匹配一個單詞,「#」用於匹配多個單詞(能夠是零個)

  

  以上圖中的配置爲例,routingKey=」quick.orange.rabbit」的消息會同時路由到Q1與Q2,routingKey=」lazy.orange.fox」的消息會路由到Q1與Q2,routingKey=」lazy.brown.fox」的消息會路由到Q2,routingKey=」lazy.pink.rabbit」的消息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=」quick.brown.fox」、routingKey=」orange」、routingKey=」quick.orange.male.rabbit」的消息將會被丟棄,由於它們沒有匹配任何bindingKey。python

 2、Exchange topic

  topic 和 direct 改動很少,就是routing key 和bind key 須要改一下mysql

  生產端:nginx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# -*- coding: UTF-8 -*-
 
 
import  pika
 
# 建立一個鏈接
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     host = 'localhost' ))
 
# 建立一個管道
channel  =  connection.channel()
 
# 聲明exchange 及類型
channel.exchange_declare(exchange = 'topic_log' ,
                          exchange_type = 'topic' )
 
# 輸入信息,格式爲 *.info from *.info test 相似
input_data  =  input ( '>>:' ).strip()
 
# 將輸入的信息以空格爲分割,轉換爲列表
data_list  =  input_data.split( ' ' )
 
# 三元運算,若是輸入信息存在,就使用輸入的信息data_list[0],不然用 'anonymous.info'
severity  =  data_list[ 0 if  len (data_list) >  1  else  'anonymous.info'
 
message  =  ' ' .join(data_list[ 2 :])  or  'hello,world!'
 
# 這裏的routing_key就是 data_list[0] 或 'info'
channel.basic_publish(exchange = 'topic_log' ,
                       routing_key = severity,
                       body = message)
print ( '[x] Sent %r:%r'  %  (severity, message))
 
connection.close()

  消費端:git

  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# -*- coding: UTF-8 -*-
 
import  pika
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     host = 'localhost' ))
 
channel  =  connection.channel()
 
# 聲明exchange 及類型
channel.exchange_declare(exchange = 'topic_log' ,
                          exchange_type = 'topic' )
 
result  =  channel.queue_declare(exclusive = True )
 
queue_name  =  result.method.queue
 
# 在此咱們定義一些列表,列表內容以下
# 這2個列表分別用來測試和routing_key匹配狀況
# 第一種只容許接收info的信息
# 第二種容許接收error 和 mysql的信息
 
 
# severities = ['*.info']
severities  =  [ '*.error' 'mysql.*' ]
 
for  severity  in  severities:
     channel.queue_bind(exchange = 'topic_log' ,
                        queue = queue_name,
                        routing_key = severity)
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
 
 
def  callback(ch, method, properties, body):
     print ( " [x] %r:%r"  %  (method.routing_key, body))
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
 
channel.start_consuming()

  咱們測試時,分別啓動兩個consumer。程序員

  第一個consumer1 中使用   severities = ['*.info']web

  第二個consumer2中使用   severities = ['*.error', 'mysql.*']面試

   生產者分別輸入:   sql

1
2
3
4
5
appache.info  from  appache info test
 
nginx.error  from  nginx error test
 
mysql.info  from  mysql info test

  能夠看到日誌信息分別會彙總到兩個consumer中, 其中 consumer1 會收到 appache.info 和 mysql.info的信息, 而 consumer2 會收到 nginx.error 和 mysql.info 的信息。

 
分類:  網絡編程進階
 
好文要頂  關注我  收藏該文   
0
0
 
 
 
« 上一篇: Rabbitmq -- direct
» 下一篇: Rabbitmq -- rpc
posted @  2018-01-08 10:24 Bigberg 閱讀(474) 評論(0) 編輯 收藏
 
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息