python之rabbitMQ篇

1、RabbitMQ安裝

 RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統,它遵循Mozilla Pulic License開源協議。python

MQ全稱爲Message Queue,消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用連接來連接它們。消息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。後端

1.yum安裝rabbitmq

1
2
3
4
5
6
7
8
9
10
11
#安裝配置epel源
   rpm - ivh http: / / dl.fedoraproject.org / pub / epel / 6 / i386 / epel - release - 6 - 8.noarch .rpm
 
#安裝Erlang
   yum - y insatll erlang
 
#安裝RabbitMQ
   yum - y install rabbitmq - server
 
#注意:
    service rabbitmq - server start / stop

2,安裝API

1
2
3
4
5
#pip安裝:
   pip install pika
 
#源碼安裝:
   https: / / pypi.python.org / pypi / pika  #官網地址

    以前咱們在介紹線程,進程的時候介紹過python中自帶的隊列用法,下面咱們經過一段代碼複習一下:服務器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#生產者消費者模型,解耦的意思就是兩個程序之間,互相沒有關聯了,互不影響。
import  queue
import  threading
import  time
q =  queue.Queue( 20 )      #隊列裏最多存放20個元素
  
def  productor(arg):            #生成者,建立30個線程來請求吃包子,往隊列裏添加請求元素
     q.put( str (arg) +  '- 包子' )
  
for  i in  range ( 30 ):
     t =  threading.Thread(target = productor,args = (i,))
     t.start()
  
def  consumer(arg):       #消費者,接收到隊列請求之後開始生產包子,來消費隊列裏的請求
     while  True :
         print (arg,q.get())
         time.sleep( 2 )
  
for  j in  range ( 3 ):
     t =  threading.Thread(target = consumer,args = (j,))
     t.start()

2、經過Python來操做RabbitMQ隊列

     上面咱們已經將環境裝備好,下面咱們經過Pika模塊來對Rabbitmq隊列來進行操做,對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。併發

1,基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
####################################生產者#####################################
 
import  pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.10.131' )) 
#建立一個連接對象,對象中綁定rabbitmq的IP地址
 
 
channel = connection.channel()        #建立一個頻道
 
channel.queue_declare(queue = 'name1' #經過這個頻道來建立隊列,若是MQ中隊列存在忽略,沒有則建立
 
channel.basic_publish(exchange = '',
                       routing_key = 'name1' ,   #指定隊列名稱
                       body = 'Hello World!' )   #往該隊列中發送一個消息
print ( " [x] Sent 'Hello World!'" )
connection.close()                           #發送完關閉連接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#####################################消費者######################################
 
import  pika
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.10.131' ))
#建立一個連接對象,對象中綁定rabbitmq的IP地址
 
channel =  connection.channel()         #建立一個頻道
 
channel.queue_declare(queue = 'name1' )   #經過這個頻道來建立隊列,若是MQ中隊列存在忽略,沒有則建立
 
def  callback(ch, method, properties, body):   #callback函數負責接收隊列裏的消息
     print ( " [x] Received %r"  %  body)
 
channel.basic_consume(callback,              #從隊列裏去消息
                       queue = 'name1' ,         #指定隊列名
                       no_ack = True )
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

acknowledgment 消息不丟失函數

   上面的例子中若是咱們將no-ack=False ,那麼當消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼RabbitMQ會從新將該任務添加到隊列中。fetch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import  pika
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.10.131' ))
channel =  connection.channel()
 
channel.queue_declare(queue = 'name1' )
 
def  callback(ch, method, properties, body):
     print ( " [x] Received %r"  %  body)
     import  time
     time.sleep( 10 )
     print ( 'ok' )
     ch.basic_ack(delivery_tag =  method.delivery_tag)   #向生成者發送消費完畢的確認信息,而後生產者將此條消息同隊列裏剔除
 
channel.basic_consume(callback,
                       queue = 'name1' ,                             
                      no_ack = False )                     #若是no_ack=False,當消費者down掉了,RabbitMQ會從新將該任務添加到隊列中
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

  上例若是消費者中斷後若是不超過10秒,從新連接的時候數據還在。當超過10秒以後,消費者往生產者發送了ack,從新連接的時候數據將消失。
spa

durable消息不丟失線程

    消費者down掉後咱們知道怎麼處理了,若是個人RabbitMQ服務down掉了該怎麼辦呢?3d

消息隊列是能夠作持久化,若是咱們在生產消息的時候就指定某條消息須要作持久化,那麼RabbitMQ發現有問題時,就會將消息保存到硬盤,持久化下來。code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
####################################生產者#####################################
#!/usr/bin/env python
  
import  pika
  
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.10.131' ))
  
channel =  connection.channel()
  
channel.queue_declare(queue = 'name2' , durable = True )    #指定隊列持久化
  
channel.basic_publish(exchange = '',
                       routing_key = 'name2' ,
                       body = 'Hello World!' ,
                       properties = pika.BasicProperties(
                           delivery_mode = 2 ,            #指定消息持久化
                       ))
print ( " [x] Sent 'Hello World!'" )
connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#####################################消費者######################################
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import  pika
  
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.10.131' ))
  
channel =  connection.channel()
  
channel.queue_declare(queue = 'name2' , durable = True )
  
  
def  callback(ch, method, properties, body):
     print ( " [x] Received %r"  %  body)
     import  time
     time.sleep( 10 )
     print ( 'ok' )
     ch.basic_ack(delivery_tag =  method.delivery_tag)
  
channel.basic_consume(callback,
                       queue = 'name2' ,
                       no_ack = False )
  
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

消息獲取順序

    默認消息隊列裏的數據是按照順序被消費者拿走的,例如:消費者1去隊列中獲取奇數序列任務,消費者2去隊列中獲取偶數序列的任務,消費者1處理的比較快而消費者2處理的比較慢,那麼消費者1就會一直處於繁忙的狀態,爲了解決這個問題在須要加入下面代碼:

channel.basic_qos(prefetch_count=1)  :表示誰來獲取,再也不按照奇偶數 排列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import  pika
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
 
channel =  connection.channel()
 
channel.queue_declare(queue = 'name1' )
 
 
def  callback(ch, method, properties, body):
     print ( " [x] Received %r"  %  body)
     import  time
     time.sleep( 10 )
     print  'ok'
     ch.basic_ack(delivery_tag =  method.delivery_tag)
 
channel.basic_qos(prefetch_count = 1 )
 
channel.basic_consume(callback,
                       queue = 'name1' ,
                       no_ack = False )
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

2,發佈訂閱

    發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,二發佈者發佈消息時,會將消息放置在全部相關隊列中。

    在RabbitMQ中,全部生產者提交的消息都有Exchange來接收,而後Exchange按照特定的策略轉發到Queue進行存儲,RabbitMQ提供了四種Exchange:fanout、direct、topic、header。因爲header模式在實際工做中用的比較少,下面主要對前三種進行比較。

exchange type = fanout :任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上

  ​爲了方便理解,應用了上面這張圖,能夠清晰的看到相互之間的關係,當咱們設置成fanout模式時,如何操做請看下面代碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
####################################發佈者#####################################
import  pika
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
channel =  connection.channel()
 
channel.exchange_declare(exchange = 'test_fanout' ,
                          type = 'fanout' )
 
message =  '4456'
channel.basic_publish(exchange = 'test_fanout' ,
                       routing_key = '',
                       body = message)
print ( ' [x] Sent %r'  %  message)
connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
####################################訂閱者#####################################
 
import  pika
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
channel =  connection.channel()
 
channel.exchange_declare(exchange = 'test_fanout' ,         #建立一個exchange
                          type = 'fanout' )                  #任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的全部Queue上
 
#隨機建立隊列
result =  channel.queue_declare(exclusive = True )
queue_name =  result.method.queue
 
#綁定
channel.queue_bind(exchange = 'test_fanout' ,
                    queue = queue_name)                    #exchange綁定後端隊列
 
print ( '<------------->' )
 
def  callback(ch,method,properties,body):
     print ( ' [x] %r'  %  body)
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
channel.start_consuming()

exchange type = direct:任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue上(關鍵字發送)

   以前事例,發送消息時明確指定了某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據關鍵字發送到消息Exchange,Exchange根據關鍵字斷定應該將數據發送至指定隊列。

 

 發佈者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
import  pika
import  sys
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
 
channel =  connection.channel()
 
channel.exchange_declare(exchange = 'direct_test' ,
                          type = 'direct' )
 
severity =  'info'          #設置一個key,
message =  '99999'
channel.basic_publish(exchange = 'direct_test' ,
                       routing_key = severity,
                       body = message)
print ( " [x] Sent %r:%r"  %  (severity, message))
connection.close()

 ​訂閱者1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
import  pika
import  sys
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
 
channel =  connection.channel()
 
channel.exchange_declare(exchange = 'direct_test' ,
                          type = 'direct' )
 
result =  channel.queue_declare(exclusive = True )
queue_name =  result.method.queue
 
severities =  [ 'error' , 'info' ,]       #綁定隊列,併發送關鍵字error,info
for  severity in  severities:
     channel.queue_bind(exchange = 'direct_test' ,
                        queue = queue_name,
                        routing_key = severity)
 
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] %r:%r"  %  (method.routing_key, body))
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
 
channel.start_consuming()

訂閱者2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#!/usr/bin/env python
import  pika
import  sys
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
 
channel =  connection.channel()
 
channel.exchange_declare(exchange = 'direct_test' ,
                          type = 'direct' )
 
result =  channel.queue_declare(exclusive = True )
queue_name =  result.method.queue
 
severities =  [ 'error' ,]
for  severity in  severities:
     channel.queue_bind(exchange = 'direct_test' ,
                        queue = queue_name,
                        routing_key = severity)
 
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] %r:%r"  %  (method.routing_key, body))
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
 
channel.start_consuming()

    結論:當咱們將發佈者的key設置成Error的時候兩個隊列對能夠收到Exchange的消息,當咱們將key設置成info後,只有訂閱者1能夠收到Exchange的消息。

  exchange type = topic:任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue上(模糊匹配)

在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入"路由值"和"關鍵字"進行匹配,匹配成功,則將數據發送到指定隊列。

  • # :表示能夠匹配0個或多個單詞;

  • * :表示只能匹配一個單詞。

1
2
3
4
5
#發送路由值        隊列中
 
www.cnblogs.com    www. *  - - - > #沒法匹配
 
www.cnblogs.com    www. # --->#匹配成功

發佈者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
import  pika
import  sys
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
 
channel =  connection.channel()
 
channel.exchange_declare(exchange = 'topic_logs' ,
                          type = 'topic' )
 
routing_key =  sys.argv[ 1 ] if  len (sys.argv) > 1  else  'anonymous.info'
 
message =  ' ' .join(sys.argv[ 2 :]) or  'Hello World!'
 
channel.basic_publish(exchange = 'topic_logs' ,
                       routing_key = routing_key,
                       body = message)
print ( " [x] Sent %r:%r"  %  (routing_key, message))
 
connection.close()
 
 
#執行方式:
python xxx.py name1   #name1爲routing_key

訂閱者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
import  pika
import  sys
 
connection =  pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost' ))
 
channel =  connection.channel()
 
channel.exchange_declare(exchange = 'topic_logs' ,
                          type = 'topic' )
 
result =  channel.queue_declare(exclusive = True )
queue_name =  result.method.queue
 
binding_keys =  sys.argv[ 1 :]
if  not  binding_keys:
     sys.stderr.write( "Usage: %s [binding_key]...\n"  %  sys.argv[ 0 ])
     sys.exit( 1 )
 
for  binding_key in  binding_keys:
     channel.queue_bind(exchange = 'topic_logs' ,
                        queue = queue_name,
                        routing_key = binding_key)
 
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] %r:%r"  %  (method.routing_key, body))
 
channel.basic_consume(callback,
                       queue = queue_name,
                       no_ack = True )
 
channel.start_consuming()
 
#執行方式:
python xxx,py name1
相關文章
相關標籤/搜索