11、python基礎(RabbitMQ隊列)

RabbitMQ隊列

環境依賴:html

erlang安裝:(https://blog.csdn.net/guowenyan001/article/details/47951369),python

必須使用管理賬戶運行Erlang安裝程序,不然RabbitMQ安裝程序所需的註冊表項將不存在。express

  下載安裝包centos

  win:http://www.erlang.org/downloadsbash

  centos:https://www.erlang-solutions.com/resources/download.html服務器

  國內快速下載地址:https://erl.uip6.com/index.html多線程

安裝python rabbitMQ module 併發

threading queue:   多線程間進行交互app

processing queue :  父進程與子進程進行交互,或者同屬於同一個父進程下多個子進程進行交互負載均衡

pip install pika
or
easy_install pika
or
源碼
  
https: / / pypi.python.org / pypi / pika
 

實現最簡單的隊列通訊

生產者端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
import  pika
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
                'localhost' ))       #至關於原生的socket鏈接已經生成了
channel  =  connection.channel()      #開闢/聲明一個管道
 
channel.queue_declare(queue = 'hello' )   #聲明一個queue
 
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange = '',
            routing_key = 'hello' ,   #queue的名字
            body = 'Hello World!',
           )
print ( " [x] Sent 'Hello World!'" )
connection.close()       #關閉消息隊列

消費者端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#_*_coding:utf-8_*_
__author__  =  'Alex Li'
import  pika
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
                'localhost' ))
channel  =  connection.channel()
 
 
#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
 
channel.queue_declare(queue = 'hello' )
 
def  callback(ch, method, properties, body):  #回調函數,函數執行完就表明消息處理完了
     print ( " [x] Received %r"  %  body)
 
#下面的函數至關於開始消費消息。尚未開始收消息
channel.basic_consume(callback,  #若是收到消息,就調用callback函數來處理消息
            queue = 'hello' ,
            no_ack = True )   #不確認,就是說這條消息無論處理完沒有,都不會給生產者發消息
  
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()  #一直是啓動的,只要start就一直收消息

 爲何服務器端和客戶端都要聲明queue,是爲了不服務器端沒有啓動的時候出現報錯,若是服務器端沒有啓動,客戶端什麼也不作,等待隊列的消息

遠程鏈接rabbitmq server的話,須要配置權限

首先在rabbitmq server上建立一個用戶

1
sudo  rabbitmqctl  add_user alex alex3714  

同時還要配置權限,容許從外面訪問

1
sudo  rabbitmqctl set_permissions -p / alex  ".*"  ".*"  ".*"

set_permissions [-p vhost] {user} {conf} {write} {read}

vhost

The name of the virtual host to which to grant the user access, defaulting to /.

user

The name of the user to grant access to the specified virtual host.

conf

A regular expression matching resource names for which the user is granted configure permissions.

write

A regular expression matching resource names for which the user is granted write permissions.

read

A regular expression matching resource names for which the user is granted read permissions.

 

客戶端鏈接的時候須要配置認證參數

credentials  =  pika.PlainCredentials( 'alex' 'alex3714' )
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     '10.211.55.5' , 5672 , '/' ,credentials))
channel  =  connection.channel()

Work Queues

在這種模式下,RabbitMQ會默認把p發的消息依次分發給各個消費者(c),跟負載均衡差很少

消息提供者代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import  pika
import  time
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     'localhost' ))
channel  =  connection.channel()
 
# 聲明queue
channel.queue_declare(queue = 'task_queue' )
 
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import  sys
 
message  =  ' ' .join(sys.argv[ 1 :])  or  "Hello World! %s"  %  time.time()
channel.basic_publish(exchange = '',
            routing_key = 'task_queue' ,
            body = message,
            properties = pika.BasicProperties(
                  delivery_mode = 2 ,   # make message persistent
                           ),
            )
print ( " [x] Sent %r"  %  message)
connection.close()

  

消費者代碼

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#_*_coding:utf-8_*_
 
import  pika, time
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
     'localhost' ))
channel  =  connection.channel()
 
 
def  callback(ch, method, properties, body):
     print ( " [x] Received %r"  %  body)
     time.sleep( 20 )
     print ( " [x] Done" )
     print ( "method.delivery_tag" ,method.delivery_tag)
     ch.basic_ack(delivery_tag = method.delivery_tag,)  #客戶端必須手動和服務器確認,消息才能消除
 
 
channel.basic_consume(callback,
            queue = 'task_queue' ,
            no_ack = True,
            )
 
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
channel.start_consuming()

  

此時,先啓動消息生產者,而後再分別啓動3個消費者,經過生產者多發送幾條消息,你會發現,這幾條消息會被依次分配到各個消費者身上  

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It's time to remove this flag and send a proper acknowledgment from the worker, once we're done with a task.

1
2
3
4
5
6
7
8
def  callback(ch, method, properties, body):
     print  " [x] Received %r"  %  (body,)
     time.sleep( body.count( '.' ) )
     print  " [x] Done"
     ch.basic_ack(delivery_tag  =  method.delivery_tag)
 
channel.basic_consume(callback,
            queue = 'hello' )

Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered

消息持久化  

We have learned how to make sure that even if the consumer dies, the task isn't lost(by default, if wanna disable  use no_ack=True). But our tasks will still be lost if RabbitMQ server stops.

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

  channel.queue_declare(queue='hello', durable=True)  #在服務器端和客戶端都要聲明durable持久化,只是把隊列持久化了,消息沒有了。

 

Although this command is correct by itself, it won't work in our setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for exampletask_queue:

  channel.queue_declare(queue='task_queue', durable=True)

  

This queue_declare change needs to be applied to both the producer and consumer code.

At that point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.

  channel.basic_publish(exchange = '',
              routing_key = "task_queue" ,
              body = message,
              properties = pika.BasicProperties(
                          delivery_mode  =  2 # make message persistent就是把隊列和消息都持久化
                              ),
              )
#將隊列和隊列裏面的消息都持久化了

消息公平分發

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1)

 

帶消息持久化+公平分發的完整代碼

生產者端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
import  pika
import  sys
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.queue_declare(queue = 'task_queue' , durable = True )
 
message  =  ' ' .join(sys.argv[ 1 :])  or  "Hello World!"
channel.basic_publish(exchange = '',
                       routing_key = 'task_queue' ,
                       body = message,
                       properties = pika.BasicProperties(
                          delivery_mode  =  2 # make message persistent
                       ))
print ( " [x] Sent %r"  %  message)
connection.close()

消費者端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env python
import  pika
import  time
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.queue_declare(queue = 'task_queue' , durable = True )
print ( ' [*] Waiting for messages. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] Received %r"  %  body)
     time.sleep(body.count(b '.' ))
     print ( " [x] Done" )
     ch.basic_ack(delivery_tag  =  method.delivery_tag)
 
channel.basic_qos(prefetch_count = 1 )  #負載均衡只用加這麼一句
channel.basic_consume(callback,
            queue = 'task_queue',
           )
channel.start_consuming()

只用在消費者端加上channel.basic_qos(prefetch_count=1)代碼便可實現消息平分

Publish\Subscribe(消息發佈\訂閱)

以前的例子都基本都是1對1的消息發送和接收,即消息只能發送到指定的queue裏,但有些時候你想讓你的消息被全部的Queue收到,相似廣播的效果,這時候就要用到exchange了,

An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

Exchange在定義的時候是有類型的,以決定究竟是哪些Queue符合條件,能夠接收消息

fanout: 全部bind到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic:全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

表達式符號說明:

  • #表明一個或多個字符,*表明任何字符
  • 例:#.a會匹配a.a,aa.a,aaa.a等
  • *.a會匹配a.a,b.a,c.a等

注:使用RoutingKey爲#,Exchange Type爲topic的時候至關於使用fanout 

headers: 經過headers 來決定把消息發給哪些queue

消息publisher

import  pika
import  sys
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.exchange_declare(exchange = 'logs' ,
                 type = 'fanout', )
 
message  =  ' ' .join(sys.argv[ 1 :])  or  "info: Hello World!"
channel.basic_publish(exchange = 'logs' ,
                       routing_key = '',
                       body = message)
print ( " [x] Sent %r"  %  message)
connection.close()

消息subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#_*_coding:utf-8_*_
__author__  =  'Alex Li'
import  pika
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.exchange_declare(exchange = 'logs' ,
                 type = 'fanout' )
 
result  =  channel.queue_declare(exclusive = True #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
queue_name  =  result.method.queue
 
channel.queue_bind(exchange = 'logs' ,
                    queue = queue_name)
 
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] %r"  %  body)
 
channel.basic_consume(callback,
            queue = queue_name,
            no_ack = True )
 
channel.start_consuming()

  

有選擇的接收消息(exchange type=direct) 

RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字斷定應該將數據發送至指定隊列

publisher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import  pika
import  sys
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.exchange_declare(exchange = 'direct_logs' ,
                  type = 'direct' )
 
severity  =  sys.argv[ 1 if  len (sys.argv) >  1  else  'info'
message  =  ' ' .join(sys.argv[ 2 :])  or  'Hello World!'
channel.basic_publish(exchange = 'direct_logs' ,
            routing_key = severity,
            body = message)
print ( " [x] Sent %r:%r"  %  (severity, message))
connection.close()

subscriber 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import  pika
import  sys
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.exchange_declare(exchange = 'direct_logs' ,
              type = 'direct' )
 
result  =  channel.queue_declare(exclusive = True )
queue_name  =  result.method.queue
 
severities  =  sys.argv[ 1 :]
if  not  severities:
     sys.stderr.write( "Usage: %s [info] [warning] [error]\n"  %  sys.argv[ 0 ])
     sys.exit( 1 )
 
for  severity  in  severities:
     channel.queue_bind(exchange = 'direct_logs' ,
               queue = queue_name,
               routing_key = severity)
 
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] %r:%r"  %  (method.routing_key, body))
 
channel.basic_consume(callback,
            queue = queue_name,
            no_ack = True )
 
channel.start_consuming()

例子:

生產者:

消費者:

 

更細緻的消息過濾

Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

publisher

import  pika
import  sys
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.exchange_declare(exchange = 'topic_logs' ,
                 type = 'topic' )
 
routing_key  =  sys.argv[ 1 if  len (sys.argv) >  1  else  'anonymous.info'
message  =  ' ' .join(sys.argv[ 2 :])  or  'Hello World!'
channel.basic_publish(exchange = 'topic_logs' ,
            routing_key = routing_key,
            body = message)
print ( " [x] Sent %r:%r"  %  (routing_key, message))
connection.close()

subscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import  pika
import  sys
 
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
channel  =  connection.channel()
 
channel.exchange_declare(exchange = 'topic_logs' ,
                  type = 'topic' )
 
result  =  channel.queue_declare(exclusive = True )
queue_name  =  result.method.queue
 
binding_keys  =  sys.argv[ 1 :]
if  not  binding_keys:
     sys.stderr.write( "Usage: %s [binding_key]...\n"  %  sys.argv[ 0 ])
     sys.exit( 1 )
 
for  binding_key  in  binding_keys:
     channel.queue_bind(exchange = 'topic_logs' ,
                   queue = queue_name,
               routing_key = binding_key)
 
print ( ' [*] Waiting for logs. To exit press CTRL+C' )
 
def  callback(ch, method, properties, body):
     print ( " [x] %r:%r"  %  (method.routing_key, body))
 
channel.basic_consume(callback,
            queue = queue_name,
            no_ack = True )
 
channel.start_consuming()

To receive all the logs run:

python receive_logs_topic.py "#"
注意:後加的參數若是爲‘’#‘’,就是收全部消息,

To receive all logs from the facility "kern":

python receive_logs_topic.py "kern.*"

Or if you want to hear only about "critical" logs:

python receive_logs_topic.py "*.critical"

You can create multiple bindings:

python receive_logs_topic.py "kern.*" "*.critical" 

And to emit a log with a routing key "kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"

  

Remote procedure call (RPC)

To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:

1
2
3
fibonacci_rpc  =  FibonacciRpcClient()
result  =  fibonacci_rpc.call( 4 )
print ( "fib(4) is %r"  %  result)

RPC server

#_*_coding:utf-8_*_
__author__  =  'Alex Li'
import  pika
import  time
connection  =  pika.BlockingConnection(pika.ConnectionParameters(
         host = 'localhost' ))
 
channel  =  connection.channel()
 
channel.queue_declare(queue = 'rpc_queue' )
 
def  fib(n):
     if  = =  0 :
         return  0
     elif  = =  1 :
         return  1
     else :
         return  fib(n - 1 +  fib(n - 2 )
 
def  on_request(ch, method, props, body):
     =  int (body)
     print ( " [.] fib(%s)"  %  n)
     response  =  fib(n)
 
     ch.basic_publish(exchange = '',
              routing_key = props.reply_to,
              properties = pika.BasicProperties(correlation_id  = props.correlation_id),
              body = str (response))
     ch.basic_ack(delivery_tag  =  method.delivery_tag)
 
channel.basic_qos(prefetch_count = 1 )
channel.basic_consume(on_request, queue = 'rpc_queue' )
 
print ( " [x] Awaiting RPC requests" )
channel.start_consuming()

RPC client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import  pika
import  uuid
 
class  FibonacciRpcClient( object ):
     def  __init__( self ):
         self .connection  =  pika.BlockingConnection(pika.ConnectionParameters(
                 host = 'localhost' ))
 
         self .channel  =  self .connection.channel()
         result  =  self .channel.queue_declare(exclusive = True )
         self .callback_queue  =  result.method.queue
         self .channel.basic_consume( self .on_response, no_ack = True ,
                       queue = self .callback_queue)
 
     def  on_response( self , ch, method, props, body):
         if  self .corr_id  = =  props.correlation_id:
            self .response  =  body
 
     def  call( self , n):
         self .response  =  None
         self .corr_id  =  str (uuid.uuid4())  #這條驗證消息一塊兒發過去了,保證併發的時候消息的一致性,不容易混淆
         self .channel.basic_publish(exchange = '',
                       routing_key = 'rpc_queue' ,
                       properties = pika.BasicProperties(
                                       reply_to  =  self .callback_queue,
                                       correlation_id  =  self .corr_id,
                                       ),
                       body = str (n),)
         while  self .response  is  None :
             self .connection.process_data_events()  #非阻塞版的start_consuming,有沒有消息都返回,不阻塞
         return  int ( self .response)
 
fibonacci_rpc  =  FibonacciRpcClient()
 
print ( " [x] Requesting fib(30)" )
response  =  fibonacci_rpc.call( 30 )
print ( " [.] Got %r"  %  response)
相關文章
相關標籤/搜索